Correlations: Add organization id (#72258)

* Add org_id to correlations

* Add tests

* Allow org_id to be null in case org_id=0 is used

* Create organization to ensure stable id is generated

* Fix linting

* Ensure backwards compatibility

* Add deprecation information

* Migrate correlations indices

* Default org_id when migrating

* Remove redundant default

* Make PK non-nullable
This commit is contained in:
Piotr Jamróz 2023-08-24 09:39:30 +02:00 committed by GitHub
parent 9b891480d6
commit b30e0aa5aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 153 additions and 7 deletions

View File

@ -110,12 +110,14 @@ func (s CorrelationsService) handleDatasourceDeletion(ctx context.Context, event
return s.SQLStore.InTransaction(ctx, func(ctx context.Context) error {
if err := s.deleteCorrelationsBySourceUID(ctx, DeleteCorrelationsBySourceUIDCommand{
SourceUID: event.UID,
OrgId: event.OrgID,
}); err != nil {
return err
}
if err := s.deleteCorrelationsByTargetUID(ctx, DeleteCorrelationsByTargetUIDCommand{
TargetUID: event.UID,
OrgId: event.OrgID,
}); err != nil {
return err
}

View File

@ -13,6 +13,7 @@ import (
func (s CorrelationsService) createCorrelation(ctx context.Context, cmd CreateCorrelationCommand) (Correlation, error) {
correlation := Correlation{
UID: util.GenerateShortUID(),
OrgID: cmd.OrgId,
SourceUID: cmd.SourceUID,
TargetUID: cmd.TargetUID,
Label: cmd.Label,
@ -87,6 +88,7 @@ func (s CorrelationsService) updateCorrelation(ctx context.Context, cmd UpdateCo
correlation := Correlation{
UID: cmd.UID,
SourceUID: cmd.SourceUID,
OrgID: cmd.OrgId,
}
err := s.SQLStore.WithTransactionalDbSession(ctx, func(session *db.Session) error {
@ -164,7 +166,8 @@ func (s CorrelationsService) getCorrelation(ctx context.Context, cmd GetCorrelat
return ErrSourceDataSourceDoesNotExists
}
found, err := session.Select("correlation.*").Join("", "data_source AS dss", "correlation.source_uid = dss.uid and dss.org_id = ?", cmd.OrgId).Join("", "data_source AS dst", "correlation.target_uid = dst.uid and dst.org_id = ?", cmd.OrgId).Where("correlation.uid = ? AND correlation.source_uid = ?", correlation.UID, correlation.SourceUID).Get(&correlation)
// Correlations created before the fix #72498 may have org_id = 0, but it's deprecated and will be removed in #72325
found, err := session.Select("correlation.*").Join("", "data_source AS dss", "correlation.source_uid = dss.uid and (correlation.org_id = 0 or dss.org_id = correlation.org_id) and dss.org_id = ?", cmd.OrgId).Join("", "data_source AS dst", "correlation.target_uid = dst.uid and dst.org_id = ?", cmd.OrgId).Where("correlation.uid = ? AND correlation.source_uid = ?", correlation.UID, correlation.SourceUID).Get(&correlation)
if !found {
return ErrCorrelationNotFound
}
@ -214,8 +217,8 @@ func (s CorrelationsService) getCorrelationsBySourceUID(ctx context.Context, cmd
if _, err := s.DataSourceService.GetDataSource(ctx, query); err != nil {
return ErrSourceDataSourceDoesNotExists
}
return session.Select("correlation.*").Join("", "data_source AS dss", "correlation.source_uid = dss.uid and dss.org_id = ?", cmd.OrgId).Join("", "data_source AS dst", "correlation.target_uid = dst.uid and dst.org_id = ?", cmd.OrgId).Where("correlation.source_uid = ?", cmd.SourceUID).Find(&correlations)
// Correlations created before the fix #72498 may have org_id = 0, but it's deprecated and will be removed in #72325
return session.Select("correlation.*").Join("", "data_source AS dss", "correlation.source_uid = dss.uid and (correlation.org_id = 0 or dss.org_id = correlation.org_id) and dss.org_id = ?", cmd.OrgId).Join("", "data_source AS dst", "correlation.target_uid = dst.uid and dst.org_id = ?", cmd.OrgId).Where("correlation.source_uid = ?", cmd.SourceUID).Find(&correlations)
})
if err != nil {
@ -235,7 +238,8 @@ func (s CorrelationsService) getCorrelations(ctx context.Context, cmd GetCorrela
err := s.SQLStore.WithDbSession(ctx, func(session *db.Session) error {
offset := cmd.Limit * (cmd.Page - 1)
q := session.Select("correlation.*").Join("", "data_source AS dss", "correlation.source_uid = dss.uid and dss.org_id = ?", cmd.OrgId).Join("", "data_source AS dst", "correlation.target_uid = dst.uid and dst.org_id = ?", cmd.OrgId)
// Correlations created before the fix #72498 may have org_id = 0, but it's deprecated and will be removed in #72325
q := session.Select("correlation.*").Join("", "data_source AS dss", "correlation.source_uid = dss.uid and (correlation.org_id = 0 or dss.org_id = correlation.org_id) and dss.org_id = ? ", cmd.OrgId).Join("", "data_source AS dst", "correlation.target_uid = dst.uid and dst.org_id = ?", cmd.OrgId)
if len(cmd.SourceUIDs) > 0 {
q.In("dss.uid", cmd.SourceUIDs)
@ -265,14 +269,16 @@ func (s CorrelationsService) getCorrelations(ctx context.Context, cmd GetCorrela
func (s CorrelationsService) deleteCorrelationsBySourceUID(ctx context.Context, cmd DeleteCorrelationsBySourceUIDCommand) error {
return s.SQLStore.WithDbSession(ctx, func(session *db.Session) error {
_, err := session.Delete(&Correlation{SourceUID: cmd.SourceUID})
// Correlations created before the fix #72498 may have org_id = 0, but it's deprecated and will be removed in #72325
_, err := session.Where("source_uid = ? and (org_id = ? or org_id = 0)", cmd.SourceUID, cmd.OrgId).Delete(&Correlation{})
return err
})
}
func (s CorrelationsService) deleteCorrelationsByTargetUID(ctx context.Context, cmd DeleteCorrelationsByTargetUIDCommand) error {
return s.SQLStore.WithDbSession(ctx, func(session *db.Session) error {
_, err := session.Delete(&Correlation{TargetUID: &cmd.TargetUID})
// Correlations created before the fix #72498 may have org_id = 0, but it's deprecated and will be removed in #72325
_, err := session.Where("source_uid = ? and (org_id = ? or org_id = 0)", cmd.TargetUID, cmd.OrgId).Delete(&Correlation{})
return err
})
}

View File

@ -109,6 +109,9 @@ type Correlation struct {
// UID of the data source the correlation originates from
// example: d0oxYRg4z
SourceUID string `json:"sourceUID" xorm:"pk 'source_uid'"`
// OrgID of the data source the correlation originates from
// Example: 1
OrgID int64 `json:"orgId" xorm:"pk 'org_id'"`
// UID of the data source the correlation points to
// example: PE1C5CBDA0504A6A3
TargetUID *string `json:"targetUID" xorm:"target_uid"`
@ -286,8 +289,10 @@ type GetCorrelationsQuery struct {
type DeleteCorrelationsBySourceUIDCommand struct {
SourceUID string
OrgId int64
}
type DeleteCorrelationsByTargetUIDCommand struct {
TargetUID string
OrgId int64
}

View File

@ -28,7 +28,8 @@ var (
withoutDefaults = "testdata/appliedDefaults"
invalidAccess = "testdata/invalid-access"
oneDatasourceWithTwoCorrelations = "testdata/one-datasource-two-correlations"
oneDatasourceWithTwoCorrelations = "testdata/one-datasource-two-correlations"
correlationsDifferentOrganizations = "testdata/correlations-different-organizations"
)
func TestDatasourceAsConfig(t *testing.T) {
@ -282,6 +283,26 @@ func TestDatasourceAsConfig(t *testing.T) {
require.Equal(t, 1, len(correlationsStore.deletedBySourceUID))
require.Equal(t, 1, len(correlationsStore.deletedByTargetUID))
})
t.Run("Using correct organization id", func(t *testing.T) {
store := &spyStore{items: []*datasources.DataSource{{Name: "Foo", OrgID: 2, ID: 1}}}
orgFake := &orgtest.FakeOrgService{}
correlationsStore := &mockCorrelationsStore{}
dc := newDatasourceProvisioner(logger, store, correlationsStore, orgFake)
err := dc.applyChanges(context.Background(), correlationsDifferentOrganizations)
if err != nil {
t.Fatalf("applyChanges return an error %v", err)
}
require.Equal(t, 2, len(correlationsStore.created))
// triggered twice - clean up on delete + update (because of the store setup above)
require.Equal(t, 2, len(correlationsStore.deletedBySourceUID))
require.Equal(t, int64(2), correlationsStore.deletedBySourceUID[0].OrgId)
require.Equal(t, int64(2), correlationsStore.deletedBySourceUID[1].OrgId)
// triggered once - just the clean up
require.Equal(t, 1, len(correlationsStore.deletedByTargetUID))
require.Equal(t, int64(2), correlationsStore.deletedByTargetUID[0].OrgId)
})
})
}

View File

@ -97,6 +97,7 @@ func (dc *DatasourceProvisioner) apply(ctx context.Context, cfg *configs) error
if len(ds.Correlations) > 0 {
if err := dc.correlationsStore.DeleteCorrelationsBySourceUID(ctx, correlations.DeleteCorrelationsBySourceUIDCommand{
SourceUID: dataSource.UID,
OrgId: dataSource.OrgID,
}); err != nil {
return err
}
@ -197,12 +198,14 @@ func (dc *DatasourceProvisioner) deleteDatasources(ctx context.Context, dsToDele
if dataSource != nil {
if err := dc.correlationsStore.DeleteCorrelationsBySourceUID(ctx, correlations.DeleteCorrelationsBySourceUIDCommand{
SourceUID: dataSource.UID,
OrgId: dataSource.OrgID,
}); err != nil {
return err
}
if err := dc.correlationsStore.DeleteCorrelationsByTargetUID(ctx, correlations.DeleteCorrelationsByTargetUIDCommand{
TargetUID: dataSource.UID,
OrgId: dataSource.OrgID,
}); err != nil {
return err
}

View File

@ -0,0 +1,6 @@
apiVersion: 1
datasources:
- name: Foo
orgId: 1
type: graphite

View File

@ -0,0 +1,15 @@
apiVersion: 1
deleteDatasources:
- name: Foo
orgId: 2
datasources:
- name: Foo
uid: foo
orgId: 2
type: graphite
correlations:
- targetUID: foo
label: self correlation
description: a description

View File

@ -0,0 +1,11 @@
apiVersion: 1
datasources:
- name: Foo
uid: foo
orgId: 3
type: graphite
correlations:
- targetUID: foo
label: self correlation
description: a description

View File

@ -29,4 +29,34 @@ func addCorrelationsMigrations(mg *Migrator) {
mg.AddMigration("add correlation config column", NewAddColumnMigration(correlationsV1, &Column{
Name: "config", Type: DB_Text, Nullable: true,
}))
// v2: adding org_id column and recreating indices
correlationsV2 := Table{
Name: "correlation",
Columns: []*Column{
{Name: "uid", Type: DB_NVarchar, Length: 40, Nullable: false, IsPrimaryKey: true},
// All existing records will have '0' assigned
{Name: "org_id", Type: DB_BigInt, IsPrimaryKey: true, Default: "0"},
{Name: "source_uid", Type: DB_NVarchar, Length: 40, Nullable: false, IsPrimaryKey: true},
// Nullable because in the future we want to have correlations to external resources
{Name: "target_uid", Type: DB_NVarchar, Length: 40, Nullable: true},
{Name: "label", Type: DB_Text, Nullable: false},
{Name: "description", Type: DB_Text, Nullable: false},
{Name: "config", Type: DB_Text, Nullable: true},
},
Indices: []*Index{
{Cols: []string{"uid"}},
{Cols: []string{"source_uid"}},
{Cols: []string{"org_id"}},
},
}
addTableReplaceMigrations(mg, correlationsV1, correlationsV2, 2, map[string]string{
"uid": "uid",
"source_uid": "source_uid",
"target_uid": "target_uid",
"label": "label",
"description": "description",
"config": "config",
})
}

View File

@ -134,6 +134,18 @@ func (c TestContext) getURL(url string, user User) string {
)
}
func (c TestContext) createOrg(name string) int64 {
c.t.Helper()
store := c.env.SQLStore
store.Cfg.AutoAssignOrg = false
quotaService := quotaimpl.ProvideService(store, store.Cfg)
orgService, err := orgimpl.ProvideService(store, store.Cfg, quotaService)
require.NoError(c.t, err)
orgId, err := orgService.GetOrCreate(context.Background(), name)
require.NoError(c.t, err)
return orgId
}
func (c TestContext) createUser(cmd user.CreateUserCommand) User {
c.t.Helper()
store := c.env.SQLStore

View File

@ -29,6 +29,14 @@ func TestIntegrationReadCorrelation(t *testing.T) {
Login: "admin",
})
otherOrgId := ctx.createOrg("New organization")
otherOrgUser := ctx.createUser(user.CreateUserCommand{
DefaultOrgRole: string(org.RoleAdmin),
Password: "admin2",
Login: "admin2",
OrgID: otherOrgId,
})
viewerUser := ctx.createUser(user.CreateUserCommand{
DefaultOrgRole: string(org.RoleViewer),
Password: "viewer",
@ -86,6 +94,14 @@ func TestIntegrationReadCorrelation(t *testing.T) {
}
dsWithoutCorrelations := ctx.createDs(createDsCommand)
createDsCommand = &datasources.AddDataSourceCommand{
Name: "with-correlations",
UID: dsWithCorrelations.UID, // reuse UID
Type: "loki",
OrgID: otherOrgId,
}
ctx.createDs(createDsCommand)
// This creates 2 records in the correlation table that should never be returned by the API.
// Given all tests in this file work on the assumption that only a single correlation exists,
// this covers the case where bad data exists in the database.
@ -157,6 +173,25 @@ func TestIntegrationReadCorrelation(t *testing.T) {
require.NoError(t, res.Body.Close())
})
t.Run("Should correctly return correlations for current organization", func(t *testing.T) {
res := ctx.Get(GetParams{
url: "/api/datasources/correlations",
user: otherOrgUser,
})
require.Equal(t, http.StatusOK, res.StatusCode)
responseBody, err := io.ReadAll(res.Body)
require.NoError(t, err)
var response correlations.GetCorrelationsResponseBody
err = json.Unmarshal(responseBody, &response)
require.NoError(t, err)
require.Len(t, response.Correlations, 0)
require.NoError(t, res.Body.Close())
})
})
t.Run("Get all correlations for a given data source", func(t *testing.T) {