mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Correlations: Allow creating correlations for provisioned data sources (#73737)
* Allow creating correlations for provisioned data sources * Update docs * Fix linting * Add missing props * Add missing props * Fix linting * Fix linting * Clarify error name * Removed error handling for a non-existing use case * Create a list of deleted data datasources based on all configs * Add org_id to correlations * Add tests * Allow org_id to be null in case org_id=0 is used * Create organization to ensure stable id is generated * Fix linting * Ensure backwards compatibility * Add deprecation information * Update comments * Override existing datasSource variable so the UID is retrieved correctly * Migrate correlations indices * Default org_id when migrating * Remove redundant default * Make PK non-nullable * Post merge fixes * Separate data sources / correlations provisioning * Adjust comments * Store new data sources in spy store so it can be used to test correlations as well * Fix linting * Update tests * Ensure response is closed * Avoid creating duplicates during provisioning * Fix updating provisioned column and update tests * Rename error message * Fix linting errors * Fix linting errors and rename variable * Update test * Update pkg/services/sqlstore/migrations/correlations_mig.go Co-authored-by: Giordano Ricci <me@giordanoricci.com> * Remove unused error * Fix lining --------- Co-authored-by: Giordano Ricci <me@giordanoricci.com>
This commit is contained in:
@@ -5,12 +5,11 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/correlations"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
"github.com/grafana/grafana/pkg/services/org"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
)
|
||||
|
||||
type Store interface {
|
||||
@@ -24,6 +23,7 @@ type CorrelationsStore interface {
|
||||
DeleteCorrelationsByTargetUID(ctx context.Context, cmd correlations.DeleteCorrelationsByTargetUIDCommand) error
|
||||
DeleteCorrelationsBySourceUID(ctx context.Context, cmd correlations.DeleteCorrelationsBySourceUIDCommand) error
|
||||
CreateCorrelation(ctx context.Context, cmd correlations.CreateCorrelationCommand) (correlations.Correlation, error)
|
||||
CreateOrUpdateCorrelation(ctx context.Context, cmd correlations.CreateCorrelationCommand) error
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -57,13 +57,11 @@ func newDatasourceProvisioner(log log.Logger, store Store, correlationsStore Cor
|
||||
}
|
||||
}
|
||||
|
||||
func (dc *DatasourceProvisioner) apply(ctx context.Context, cfg *configs) error {
|
||||
if err := dc.deleteDatasources(ctx, cfg.DeleteDatasources); err != nil {
|
||||
func (dc *DatasourceProvisioner) provisionDataSources(ctx context.Context, cfg *configs, willExistAfterProvisioning map[DataSourceMapKey]bool) error {
|
||||
if err := dc.deleteDatasources(ctx, cfg.DeleteDatasources, willExistAfterProvisioning); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
correlationsToInsert := make([]correlations.CreateCorrelationCommand, 0)
|
||||
|
||||
for _, ds := range cfg.Datasources {
|
||||
cmd := &datasources.GetDataSourceQuery{OrgID: ds.OrgID, Name: ds.Name}
|
||||
dataSource, err := dc.store.GetDataSource(ctx, cmd)
|
||||
@@ -74,63 +72,91 @@ func (dc *DatasourceProvisioner) apply(ctx context.Context, cfg *configs) error
|
||||
if errors.Is(err, datasources.ErrDataSourceNotFound) {
|
||||
insertCmd := createInsertCommand(ds)
|
||||
dc.log.Info("inserting datasource from configuration ", "name", insertCmd.Name, "uid", insertCmd.UID)
|
||||
dataSource, err := dc.store.AddDataSource(ctx, insertCmd)
|
||||
_, err = dc.store.AddDataSource(ctx, insertCmd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, correlation := range ds.Correlations {
|
||||
if insertCorrelationCmd, err := makeCreateCorrelationCommand(correlation, dataSource.UID, insertCmd.OrgID); err == nil {
|
||||
correlationsToInsert = append(correlationsToInsert, insertCorrelationCmd)
|
||||
} else {
|
||||
dc.log.Error("failed to parse correlation", "correlation", correlation)
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
updateCmd := createUpdateCommand(ds, dataSource.ID)
|
||||
dc.log.Debug("updating datasource from configuration", "name", updateCmd.Name, "uid", updateCmd.UID)
|
||||
if _, err := dc.store.UpdateDataSource(ctx, updateCmd); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(ds.Correlations) > 0 {
|
||||
if err := dc.correlationsStore.DeleteCorrelationsBySourceUID(ctx, correlations.DeleteCorrelationsBySourceUIDCommand{
|
||||
SourceUID: dataSource.UID,
|
||||
OrgId: dataSource.OrgID,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, correlation := range ds.Correlations {
|
||||
if insertCorrelationCmd, err := makeCreateCorrelationCommand(correlation, dataSource.UID, updateCmd.OrgID); err == nil {
|
||||
correlationsToInsert = append(correlationsToInsert, insertCorrelationCmd)
|
||||
} else {
|
||||
dc.log.Error("failed to parse correlation", "correlation", correlation)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, createCorrelationCmd := range correlationsToInsert {
|
||||
if _, err := dc.correlationsStore.CreateCorrelation(ctx, createCorrelationCmd); err != nil {
|
||||
return fmt.Errorf("err=%s source=%s", err.Error(), createCorrelationCmd.SourceUID)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dc *DatasourceProvisioner) provisionCorrelations(ctx context.Context, cfg *configs) error {
|
||||
for _, ds := range cfg.Datasources {
|
||||
cmd := &datasources.GetDataSourceQuery{OrgID: ds.OrgID, Name: ds.Name}
|
||||
dataSource, err := dc.store.GetDataSource(ctx, cmd)
|
||||
|
||||
if errors.Is(err, datasources.ErrDataSourceNotFound) {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := dc.correlationsStore.DeleteCorrelationsBySourceUID(ctx, correlations.DeleteCorrelationsBySourceUIDCommand{
|
||||
SourceUID: dataSource.UID,
|
||||
OrgId: dataSource.OrgID,
|
||||
OnlyProvisioned: true,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, correlation := range ds.Correlations {
|
||||
createCorrelationCmd, err := makeCreateCorrelationCommand(correlation, dataSource.UID, dataSource.OrgID)
|
||||
if err != nil {
|
||||
dc.log.Error("failed to parse correlation", "correlation", correlation)
|
||||
return err
|
||||
}
|
||||
// "Provisioned" column was introduced in #71110. Any records that were created before this change
|
||||
// are marked as "not provisioned". To avoid duplicates we ensure these records are updated instead
|
||||
// of being inserted once again with Provisioned=true.
|
||||
// This is required to help users upgrade with confidence. Post GA we do not expect this code to be
|
||||
// needed at all as it should result in a no-op. This should be mentioned in what's new docs when
|
||||
// feature becomes GA.
|
||||
// This can be changed to dc.correlationsStore.CreateCorrelation in Grafana 11 and CreateOrUpdateCorrelation
|
||||
// can be removed.
|
||||
if err := dc.correlationsStore.CreateOrUpdateCorrelation(ctx, createCorrelationCmd); err != nil {
|
||||
return fmt.Errorf("err=%s source=%s", err.Error(), createCorrelationCmd.SourceUID)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type DataSourceMapKey struct {
|
||||
Name string
|
||||
OrgId int64
|
||||
}
|
||||
|
||||
func (dc *DatasourceProvisioner) applyChanges(ctx context.Context, configPath string) error {
|
||||
configs, err := dc.cfgProvider.readConfig(ctx, configPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Creates a list of data sources that will be ultimately deleted after provisioning finishes
|
||||
willExistAfterProvisioning := map[DataSourceMapKey]bool{}
|
||||
for _, cfg := range configs {
|
||||
if err := dc.apply(ctx, cfg); err != nil {
|
||||
for _, ds := range cfg.DeleteDatasources {
|
||||
willExistAfterProvisioning[DataSourceMapKey{Name: ds.Name, OrgId: ds.OrgID}] = false
|
||||
}
|
||||
for _, ds := range cfg.Datasources {
|
||||
willExistAfterProvisioning[DataSourceMapKey{Name: ds.Name, OrgId: ds.OrgID}] = true
|
||||
}
|
||||
}
|
||||
|
||||
for _, cfg := range configs {
|
||||
if err := dc.provisionDataSources(ctx, cfg, willExistAfterProvisioning); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, cfg := range configs {
|
||||
if err := dc.provisionCorrelations(ctx, cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -141,11 +167,11 @@ func (dc *DatasourceProvisioner) applyChanges(ctx context.Context, configPath st
|
||||
func makeCreateCorrelationCommand(correlation map[string]any, SourceUID string, OrgId int64) (correlations.CreateCorrelationCommand, error) {
|
||||
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
||||
createCommand := correlations.CreateCorrelationCommand{
|
||||
SourceUID: SourceUID,
|
||||
Label: correlation["label"].(string),
|
||||
Description: correlation["description"].(string),
|
||||
OrgId: OrgId,
|
||||
SkipReadOnlyCheck: true,
|
||||
SourceUID: SourceUID,
|
||||
Label: correlation["label"].(string),
|
||||
Description: correlation["description"].(string),
|
||||
OrgId: OrgId,
|
||||
Provisioned: true,
|
||||
}
|
||||
|
||||
targetUID, ok := correlation["targetUID"].(string)
|
||||
@@ -182,37 +208,23 @@ func makeCreateCorrelationCommand(correlation map[string]any, SourceUID string,
|
||||
return createCommand, nil
|
||||
}
|
||||
|
||||
func (dc *DatasourceProvisioner) deleteDatasources(ctx context.Context, dsToDelete []*deleteDatasourceConfig) error {
|
||||
func (dc *DatasourceProvisioner) deleteDatasources(ctx context.Context, dsToDelete []*deleteDatasourceConfig, willExistAfterProvisioning map[DataSourceMapKey]bool) error {
|
||||
for _, ds := range dsToDelete {
|
||||
cmd := &datasources.DeleteDataSourceCommand{OrgID: ds.OrgID, Name: ds.Name}
|
||||
getDsQuery := &datasources.GetDataSourceQuery{Name: ds.Name, OrgID: ds.OrgID}
|
||||
dataSource, err := dc.store.GetDataSource(ctx, getDsQuery)
|
||||
_, err := dc.store.GetDataSource(ctx, getDsQuery)
|
||||
|
||||
if err != nil && !errors.Is(err, datasources.ErrDataSourceNotFound) {
|
||||
return err
|
||||
}
|
||||
|
||||
// Skip publishing the event as the data source is not really deleted, it will be re-created during provisioning
|
||||
// This is to avoid cleaning up any resources related to the data source (e.g. correlations)
|
||||
skipPublish := willExistAfterProvisioning[DataSourceMapKey{Name: ds.Name, OrgId: ds.OrgID}]
|
||||
cmd := &datasources.DeleteDataSourceCommand{OrgID: ds.OrgID, Name: ds.Name, SkipPublish: skipPublish}
|
||||
if err := dc.store.DeleteDataSource(ctx, cmd); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if dataSource != nil {
|
||||
if err := dc.correlationsStore.DeleteCorrelationsBySourceUID(ctx, correlations.DeleteCorrelationsBySourceUIDCommand{
|
||||
SourceUID: dataSource.UID,
|
||||
OrgId: dataSource.OrgID,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := dc.correlationsStore.DeleteCorrelationsByTargetUID(ctx, correlations.DeleteCorrelationsByTargetUIDCommand{
|
||||
TargetUID: dataSource.UID,
|
||||
OrgId: dataSource.OrgID,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dc.log.Info("deleted correlations based on configuration", "ds_name", ds.Name)
|
||||
}
|
||||
|
||||
if cmd.DeletedDatasourcesCount > 0 {
|
||||
dc.log.Info("deleted datasource based on configuration", "name", ds.Name)
|
||||
}
|
||||
|
Reference in New Issue
Block a user