make correlations a separate table

This commit is contained in:
Elfo404 2022-07-12 16:52:01 +02:00
parent e1da3d5391
commit 83814af498
No known key found for this signature in database
GPG Key ID: 586539D9491F0726
23 changed files with 319 additions and 357 deletions

View File

@ -246,7 +246,8 @@ datasources:
type: loki
access: proxy
url: http://localhost:3100
editable: false
editable: true
version: 10
correlations:
- targetUid: gdev-jaeger
label: "Jaeger traces"
@ -255,6 +256,7 @@ datasources:
label: "Zipkin traces"
description: "Related traces stored in Zipkin"
jsonData:
something: here
manageAlerts: false
derivedFields:
- name: "traceID"

View File

@ -323,11 +323,6 @@ func (hs *HTTPServer) registerRoutes() {
datasourceRoute.Get("/uid/:uid", authorize(reqOrgAdmin, ac.EvalPermission(datasources.ActionRead, uidScope)), routing.Wrap(hs.GetDataSourceByUID))
datasourceRoute.Get("/name/:name", authorize(reqOrgAdmin, ac.EvalPermission(datasources.ActionRead, nameScope)), routing.Wrap(hs.GetDataSourceByName))
datasourceRoute.Get("/id/:name", authorize(reqSignedIn, ac.EvalPermission(datasources.ActionIDRead, nameScope)), routing.Wrap(hs.GetDataSourceIdByName))
// Correlations
datasourceRoute.Group("/:uid/correlations", func(correlationsRoute routing.RouteRegister) {
correlationsRoute.Post("/", authorize(reqOrgAdmin, ac.EvalPermission(datasources.ActionWrite, uidScope)), routing.Wrap(hs.CreateCorrelation))
})
})
apiRoute.Get("/plugins", routing.Wrap(hs.GetPluginList))

View File

@ -429,41 +429,6 @@ func (hs *HTTPServer) GetDataSourceIdByName(c *models.ReqContext) response.Respo
return response.JSON(http.StatusOK, &dtos)
}
// Post /api/datasources/:uid/correlations
func (hs *HTTPServer) CreateCorrelation(c *models.ReqContext) response.Response {
cmd := datasources.CreateCorrelationCommand{
OrgID: c.OrgId,
}
if err := web.Bind(c.Req, &cmd); err != nil {
return response.Error(http.StatusBadRequest, "bad request data", err)
}
cmd.SourceUID = web.Params(c.Req)[":uid"]
err := hs.DataSourcesService.CreateCorrelation(c.Req.Context(), &cmd)
if err != nil {
// TODO: we may want to differentiate the following error between source and target DS
if errors.Is(err, datasources.ErrDataSourceNotFound) {
return response.Error(http.StatusNotFound, "Data source not found", nil)
}
if errors.Is(err, datasources.ErrDatasourceIsReadOnly) {
return response.Error(http.StatusForbidden, "Data source is read only", nil)
}
if errors.Is(err, datasources.ErrCorrelationUidExists) {
return response.Error(http.StatusConflict, fmt.Sprintf("Correlation with uid %s already exists", cmd.Uid), nil)
}
return response.Error(http.StatusInternalServerError, "Failed to add correlation", err)
}
hs.Live.HandleDatasourceUpdate(c.OrgId, cmd.SourceUID)
return response.JSON(http.StatusOK, util.DynMap{
"message": "Correlation created",
"correlation": cmd.Result,
})
}
// /api/datasources/:id/resources/*
func (hs *HTTPServer) CallDatasourceResource(c *models.ReqContext) {
datasourceID, err := strconv.ParseInt(web.Params(c.Req)[":id"], 10, 64)

View File

@ -177,25 +177,6 @@ func TestUpdateDataSource_URLWithoutProtocol(t *testing.T) {
assert.Equal(t, 200, sc.resp.Code)
}
func TestCreateCorrelation(t *testing.T) {
t.Run("Creating a correlation without a targetUID should result in a 400", func(t *testing.T) {
hs := &HTTPServer{
DataSourcesService: &dataSourcesServiceMock{},
}
sc := setupScenarioContext(t, "/api/datasources/1234/correlations")
sc.m.Post(sc.url, routing.Wrap(func(c *models.ReqContext) response.Response {
c.Req.Body = mockRequestBody(datasources.CreateCorrelationCommand{})
return hs.CreateCorrelation(c)
}))
sc.fakeReqWithParams("POST", sc.url, map[string]string{}).exec()
assert.Equal(t, http.StatusBadRequest, sc.resp.Code)
})
}
func TestAPI_Datasources_AccessControl(t *testing.T) {
testDatasource := datasources.DataSource{
Id: 3,
@ -568,7 +549,6 @@ type dataSourcesServiceMock struct {
expectedDatasources []*datasources.DataSource
expectedDatasource *datasources.DataSource
expectedCorrelation *datasources.CorrelationDTO
expectedError error
}
@ -604,11 +584,6 @@ func (m *dataSourcesServiceMock) UpdateDataSource(ctx context.Context, cmd *data
return m.expectedError
}
func (m *dataSourcesServiceMock) CreateCorrelation(ctx context.Context, cmd *datasources.CreateCorrelationCommand) error {
cmd.Result = *m.expectedCorrelation
return m.expectedError
}
func (m *dataSourcesServiceMock) DecryptedValues(ctx context.Context, ds *datasources.DataSource) (map[string]string, error) {
decryptedValues := make(map[string]string)
return decryptedValues, m.expectedError

View File

@ -333,19 +333,6 @@ import (
// 404: notFoundError
// 500: internalServerError
// swagger:route POST /datasources/{uid}/correlations correlations createCorrelation
//
// Creates a correlation.
//
// Responses:
// 200: createCorrelationResponse
// 400: badRequestError
// 401: unauthorisedError
// 403: forbiddenError
// 404: notFoundError
// 409: conflictError
// 500: internalServerError
// swagger:parameters updateDatasourceByID datasourceProxyDELETEcalls
// swagger:parameters checkDatasourceHealthByID fetchDatasourceResourcesByID
type DatasourceID struct {
@ -493,16 +480,6 @@ type UpdateDatasourceByUIDParams struct {
DatasourceUID string `json:"uid"`
}
// swagger:parameters createCorrelation
type CreateCorrelationParams struct {
// in:body
// required:true
Body datasources.CreateCorrelationCommand
// in:path
// required:true
DatasourceUID string `json:"uid"`
}
// swagger:response getDatasourcesResponse
type GetDatasourcesResponse struct {
// The response message
@ -571,18 +548,3 @@ type DeleteDatasourceByNameResponse struct {
Message string `json:"message"`
} `json:"body"`
}
// swagger:response createCorrelationResponse
type CreateCorrelationResponse struct {
// in: body
Body struct {
// Correlation properties
// required: true
Correlation datasources.CorrelationDTO `json:"correlation"`
// Message Message of the created correlation.
// required: true
// example: Correlation created
Message string `json:"message"`
} `json:"body"`
}

View File

@ -63,6 +63,7 @@ import (
pref "github.com/grafana/grafana/pkg/services/preference"
"github.com/grafana/grafana/pkg/services/provisioning"
"github.com/grafana/grafana/pkg/services/correlations"
publicdashboardsApi "github.com/grafana/grafana/pkg/services/publicdashboards/api"
"github.com/grafana/grafana/pkg/services/query"
"github.com/grafana/grafana/pkg/services/queryhistory"
@ -120,6 +121,7 @@ type HTTPServer struct {
SearchService search.Service
ShortURLService shorturls.Service
QueryHistoryService queryhistory.Service
CorrelationsService correlations.Service
Live *live.GrafanaLive
LivePushGateway *pushhttp.Gateway
ThumbService thumbs.Service
@ -184,7 +186,7 @@ func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routi
pluginDashboardService plugindashboards.Service, pluginStore plugins.Store, pluginClient plugins.Client,
pluginErrorResolver plugins.ErrorResolver, pluginManager plugins.Manager, settingsProvider setting.Provider,
dataSourceCache datasources.CacheService, userTokenService models.UserTokenService,
cleanUpService *cleanup.CleanUpService, shortURLService shorturls.Service, queryHistoryService queryhistory.Service,
cleanUpService *cleanup.CleanUpService, shortURLService shorturls.Service, queryHistoryService queryhistory.Service, correlationsService correlations.Service,
thumbService thumbs.Service, remoteCache *remotecache.RemoteCache, provisioningService provisioning.ProvisioningService,
loginService login.Service, authenticator loginpkg.Authenticator, accessControl accesscontrol.AccessControl,
dataSourceProxy *datasourceproxy.DataSourceProxyService, searchService *search.SearchService,
@ -235,6 +237,7 @@ func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routi
cleanUpService: cleanUpService,
ShortURLService: shortURLService,
QueryHistoryService: queryHistoryService,
CorrelationsService: correlationsService,
Features: features,
ThumbService: thumbService,
StorageService: storageService,

View File

@ -44,6 +44,7 @@ import (
"github.com/grafana/grafana/pkg/services/comments"
"github.com/grafana/grafana/pkg/services/contexthandler"
"github.com/grafana/grafana/pkg/services/contexthandler/authproxy"
"github.com/grafana/grafana/pkg/services/correlations"
"github.com/grafana/grafana/pkg/services/dashboardimport"
dashboardimportservice "github.com/grafana/grafana/pkg/services/dashboardimport/service"
"github.com/grafana/grafana/pkg/services/dashboards"
@ -179,6 +180,8 @@ var wireBasicSet = wire.NewSet(
wire.Bind(new(shorturls.Service), new(*shorturls.ShortURLService)),
queryhistory.ProvideService,
wire.Bind(new(queryhistory.Service), new(*queryhistory.QueryHistoryService)),
correlations.ProvideService,
wire.Bind(new(correlations.Service), new(*correlations.CorrelationsService)),
quota.ProvideService,
remotecache.ProvideService,
loginservice.ProvideService,

View File

@ -0,0 +1,45 @@
package correlations
import (
"errors"
"net/http"
"github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/middleware"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/web"
)
func (s *CorrelationsService) registerAPIEndpoints() {
// TODO: Add accesscontrol here. permissions should match the ones for the source datasource
s.RouteRegister.Group("/api/datasources/uid/:uid/correlations", func(entities routing.RouteRegister) {
entities.Post("/", middleware.ReqSignedIn, routing.Wrap(s.createHandler))
})
}
// createHandler handles POST /datasources/uid/:uid/correlations
func (s *CorrelationsService) createHandler(c *models.ReqContext) response.Response {
cmd := CreateCorrelationCommand{}
if err := web.Bind(c.Req, &cmd); err != nil {
return response.Error(http.StatusBadRequest, "bad request data", err)
}
cmd.SourceUID = web.Params(c.Req)[":uid"]
cmd.OrgId = c.OrgId
query, err := s.CreateCorrelation(c.Req.Context(), cmd)
if err != nil {
if errors.Is(err, ErrSourceDataSourceDoesNotExists) || errors.Is(err, ErrTargetDataSourceDoesNotExists) {
return response.Error(http.StatusNotFound, "Data source not found", err)
}
if errors.Is(err, ErrSourceDataSourceReadOnly) {
return response.Error(http.StatusForbidden, "Data source is read only", err)
}
return response.Error(http.StatusInternalServerError, "Failed to add correlation", err)
}
return response.JSON(http.StatusOK, CreateCorrelationResponse{Result: query})
}

View File

@ -0,0 +1,43 @@
package correlations
import (
"context"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/sqlstore"
)
func ProvideService(sqlStore *sqlstore.SQLStore, routeRegister routing.RouteRegister, datasourceService datasources.DataSourceService) *CorrelationsService {
s := &CorrelationsService{
SQLStore: sqlStore,
RouteRegister: routeRegister,
log: log.New("correlations"),
datasourceService: datasourceService,
}
s.registerAPIEndpoints()
return s
}
type Service interface {
CreateCorrelation(ctx context.Context, cmd CreateCorrelationCommand) (CorrelationDTO, error)
DeleteCorrelationsBySourceUID(ctx context.Context, cmd DeleteCorrelationsBySourceUIDCommand) error
}
type CorrelationsService struct {
SQLStore *sqlstore.SQLStore
RouteRegister routing.RouteRegister
log log.Logger
datasourceService datasources.DataSourceService
}
func (s CorrelationsService) CreateCorrelation(ctx context.Context, cmd CreateCorrelationCommand) (CorrelationDTO, error) {
return s.createCorrelation(ctx, cmd)
}
func (s CorrelationsService) DeleteCorrelationsBySourceUID(ctx context.Context, cmd DeleteCorrelationsBySourceUIDCommand) error {
return s.deleteCorrelationsBySourceUID(ctx, cmd)
}

View File

@ -0,0 +1,74 @@
package correlations
import (
"context"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/util"
)
// createCorrelation adds a correlation
func (s CorrelationsService) createCorrelation(ctx context.Context, cmd CreateCorrelationCommand) (CorrelationDTO, error) {
correlation := Correlation{
UID: util.GenerateShortUID(),
SourceUID: cmd.SourceUID,
TargetUID: cmd.TargetUID,
Label: cmd.Label,
Description: cmd.Description,
}
err := s.SQLStore.WithTransactionalDbSession(ctx, func(session *sqlstore.DBSession) error {
var err error
query := &datasources.GetDataSourceQuery{
OrgId: cmd.OrgId,
Uid: cmd.SourceUID,
}
if err = s.datasourceService.GetDataSource(ctx, query); err != nil {
return ErrSourceDataSourceDoesNotExists
}
if query.Result.ReadOnly {
return ErrSourceDataSourceReadOnly
}
if err = s.datasourceService.GetDataSource(ctx, &datasources.GetDataSourceQuery{
OrgId: cmd.OrgId,
Uid: cmd.TargetUID,
}); err != nil {
return ErrTargetDataSourceDoesNotExists
}
_, err = session.Insert(correlation)
if err != nil {
return err
}
return nil
})
if err != nil {
return CorrelationDTO{}, err
}
dto := CorrelationDTO{
UID: correlation.UID,
SourceUID: correlation.SourceUID,
TargetUID: correlation.TargetUID,
Label: correlation.Label,
Description: correlation.Description,
}
return dto, nil
}
func (s CorrelationsService) deleteCorrelationsBySourceUID(ctx context.Context, cmd DeleteCorrelationsBySourceUIDCommand) error {
return s.SQLStore.WithDbSession(ctx, func(session *sqlstore.DBSession) error {
return s.SQLStore.InTransaction(ctx, func(ctx context.Context) error {
_, err := session.Delete(&Correlation{SourceUID: cmd.SourceUID})
return err
})
})
}

View File

@ -0,0 +1,52 @@
package correlations
import (
"errors"
)
var (
ErrSourceDataSourceReadOnly = errors.New("source data source is read only")
ErrSourceDataSourceDoesNotExists = errors.New("source data source does not exist")
ErrTargetDataSourceDoesNotExists = errors.New("target data source does not exist")
ErrCorrelationFailedGenerateUniqueUid = errors.New("failed to generate unique correlation UID")
ErrCorrelationIdentifierNotSet = errors.New("source identifier and org id are needed to be able to edit correlations")
)
// Correlation is the model for correlations definitions
type Correlation struct {
ID int64 `xorm:"pk autoincr 'id'"`
UID string `xorm:"uid"`
SourceUID string `xorm:"source_uid"`
TargetUID string `xorm:"target_uid"`
Label string `xorm:"label"`
Description string `xorm:"description"`
}
type CorrelationDTO struct {
UID string `json:"uid" xorm:"uid"`
SourceUID string `json:"sourceUid"`
TargetUID string `json:"targetUid"`
Label string `json:"label"`
Description string `json:"description"`
}
// CreateCorrelationResponse is a response struct for CorrelationDTO
type CreateCorrelationResponse struct {
Result CorrelationDTO `json:"result"`
}
// CreateCorrelationCommand is the command for creating a correlation
// swagger:model
type CreateCorrelationCommand struct {
// UID of the data source for which correlation is created.
// example: PE1C5CBDA0504A6A3
SourceUID string
OrgId int64
TargetUID string `json:"targetUid" binding:"Required"`
Label string `json:"label"`
Description string `json:"description"`
}
type DeleteCorrelationsBySourceUIDCommand struct {
SourceUID string
}

View File

@ -33,9 +33,6 @@ type DataSourceService interface {
// GetDefaultDataSource gets the default datasource.
GetDefaultDataSource(ctx context.Context, query *GetDefaultDataSourceQuery) error
// CreateCorrelation adds a correlation between two datasources.
CreateCorrelation(ctx context.Context, cmd *CreateCorrelationCommand) error
// GetHTTPTransport gets a datasource specific HTTP transport.
GetHTTPTransport(ctx context.Context, ds *DataSource, provider httpclient.Provider, customMiddlewares ...sdkhttpclient.Middleware) (http.RoundTripper, error)

View File

@ -3,14 +3,12 @@ package datasources
import "errors"
var (
ErrDataSourceNotFound = errors.New("data source not found")
ErrDataSourceNameExists = errors.New("data source with the same name already exists")
ErrDataSourceUidExists = errors.New("data source with the same uid already exists")
ErrDataSourceUpdatingOldVersion = errors.New("trying to update old version of datasource")
ErrDataSourceAccessDenied = errors.New("data source access denied")
ErrDataSourceFailedGenerateUniqueUid = errors.New("failed to generate unique datasource ID")
ErrDataSourceIdentifierNotSet = errors.New("unique identifier and org id are needed to be able to get or delete a datasource")
ErrDatasourceIsReadOnly = errors.New("data source is readonly, can only be updated from configuration")
ErrCorrelationUidExists = errors.New("correlation with the same uid already exists")
ErrCorrelationFailedGenerateUniqueUid = errors.New("failed to generate unique correlation ID")
ErrDataSourceNotFound = errors.New("data source not found")
ErrDataSourceNameExists = errors.New("data source with the same name already exists")
ErrDataSourceUidExists = errors.New("data source with the same uid already exists")
ErrDataSourceUpdatingOldVersion = errors.New("trying to update old version of datasource")
ErrDataSourceAccessDenied = errors.New("data source access denied")
ErrDataSourceFailedGenerateUniqueUid = errors.New("failed to generate unique datasource ID")
ErrDataSourceIdentifierNotSet = errors.New("unique identifier and org id are needed to be able to get or delete a datasource")
ErrDatasourceIsReadOnly = errors.New("data source is readonly, can only be updated from configuration")
)

View File

@ -94,29 +94,6 @@ func (s *FakeDataSourceService) UpdateDataSource(ctx context.Context, cmd *datas
return datasources.ErrDataSourceNotFound
}
func (s *FakeDataSourceService) CreateCorrelation(ctx context.Context, cmd *datasources.CreateCorrelationCommand) error {
for _, datasource := range s.DataSources {
if cmd.SourceUID != "" && cmd.SourceUID == datasource.Uid {
newCorrelation := datasources.Correlation{
Target: cmd.TargetUID,
Label: cmd.Label,
Description: cmd.Description,
}
datasource.Correlations = append(datasource.Correlations, newCorrelation)
cmd.Result = datasources.CorrelationDTO{
Target: newCorrelation.Target,
Description: newCorrelation.Description,
Label: newCorrelation.Label,
Version: cmd.Version + 1,
}
return nil
}
}
return datasources.ErrDataSourceNotFound
}
func (s *FakeDataSourceService) GetDefaultDataSource(ctx context.Context, query *datasources.GetDefaultDataSourceQuery) error {
return nil
}

View File

@ -30,24 +30,6 @@ const (
type DsAccess string
type Correlation struct {
Uid string `json:"uid"`
Target string `json:"target"`
Label string `json:"label,omitempty"`
Description string `json:"description,omitempty"`
}
// CorrelationDTO extends Correlation by adding `Version`, which is persisted and read from the `version` column
// in the data_source table. It's not part of the Correlation model but needed when editing correlations to handle
// concurrent edits.
type CorrelationDTO struct {
Uid string `json:"uid"`
Target string `json:"target"`
Label string `json:"label,omitempty"`
Description string `json:"description,omitempty"`
Version int `json:"version,omitempty"`
}
type DataSource struct {
Id int64 `json:"id,omitempty"`
OrgId int64 `json:"orgId,omitempty"`
@ -67,7 +49,6 @@ type DataSource struct {
BasicAuthPassword string `json:"-"`
WithCredentials bool `json:"withCredentials"`
IsDefault bool `json:"isDefault"`
Correlations []Correlation `json:"correlations"`
JsonData *simplejson.Json `json:"jsonData"`
SecureJsonData map[string][]byte `json:"secureJsonData"`
ReadOnly bool `json:"readOnly"`
@ -113,7 +94,6 @@ type AddDataSourceCommand struct {
BasicAuthUser string `json:"basicAuthUser"`
WithCredentials bool `json:"withCredentials"`
IsDefault bool `json:"isDefault"`
Correlations []Correlation `json:"-"`
JsonData *simplejson.Json `json:"jsonData"`
SecureJsonData map[string]string `json:"secureJsonData"`
Uid string `json:"uid"`
@ -139,7 +119,6 @@ type UpdateDataSourceCommand struct {
BasicAuthUser string `json:"basicAuthUser"`
WithCredentials bool `json:"withCredentials"`
IsDefault bool `json:"isDefault"`
Correlations []Correlation `json:"-"`
JsonData *simplejson.Json `json:"jsonData"`
SecureJsonData map[string]string `json:"secureJsonData"`
Version int `json:"version"`
@ -168,30 +147,6 @@ type DeleteDataSourceCommand struct {
UpdateSecretFn UpdateSecretFn
}
// CreateCorrelationCommand adds a correlation
type CreateCorrelationCommand struct {
TargetUID string `json:"targetUid" binding:"Required"`
Label string `json:"label"`
Description string `json:"description"`
Version int `json:"version"`
Uid string `json:"-"`
SourceUID string `json:"-"`
OrgID int64 `json:"-"`
Result CorrelationDTO `json:"-"`
}
// UpdateCorrelationsCommand updates a correlation
type UpdateCorrelationsCommand struct {
SourceUID string
Uid string
Correlations []Correlation
OrgId int64
Version int
Result []Correlation
}
// Function for updating secrets along with datasources, to ensure atomicity
type UpdateSecretFn func() error

View File

@ -22,7 +22,6 @@ import (
"github.com/grafana/grafana/pkg/services/secrets/kvstore"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/util"
)
type Service struct {
@ -238,93 +237,6 @@ func (s *Service) GetDefaultDataSource(ctx context.Context, query *datasources.G
return s.SQLStore.GetDefaultDataSource(ctx, query)
}
// As correlations gets added in the data_source table as JSON, there's no way to delete an entry
// from the correlations column when the target datasource gets deleted from grafana (or changes orgId).
// We therefore need to either:
// 1) do a full table scan whenever we delete a datasource and delete correlations having the deleted DS as a target
// 2) setup a cleanup job that cleans the correlation column from stale data
// 3) do nothing and handle it in the FE
func (s *Service) CreateCorrelation(ctx context.Context, cmd *datasources.CreateCorrelationCommand) error {
return s.SQLStore.InTransaction(ctx, func(ctx context.Context) error {
var err error
// TODO: the following is far from efficient, but should be enough to get a POC running
query := &datasources.GetDataSourceQuery{
OrgId: cmd.OrgID,
Uid: cmd.SourceUID,
}
if err = s.SQLStore.GetDataSource(ctx, query); err != nil {
// Source datasource does not exist
return err
}
ds := query.Result
if ds.ReadOnly {
return datasources.ErrDatasourceIsReadOnly
}
if err = s.SQLStore.GetDataSource(ctx, &datasources.GetDataSourceQuery{
OrgId: cmd.OrgID,
Uid: cmd.TargetUID,
}); err != nil {
// target datasource does not exist
return err
}
if cmd.Uid != "" {
for _, correlation := range ds.Correlations {
if correlation.Target == cmd.Uid {
return datasources.ErrCorrelationUidExists
}
}
} else {
uidMap := make(map[string]bool)
for _, correlation := range ds.Correlations {
uidMap[correlation.Uid] = true
}
for i := 0; i < 10; i++ {
uid := util.GenerateShortUID()
if exists := uidMap[uid]; !exists {
cmd.Uid = uid
break
}
}
if cmd.Uid == "" {
return datasources.ErrCorrelationFailedGenerateUniqueUid
}
}
newCorrelation := datasources.Correlation{
Uid: cmd.Uid,
Target: cmd.TargetUID,
Label: cmd.Label,
Description: cmd.Description,
}
ds.Correlations = append(ds.Correlations, newCorrelation)
updateCmd := &datasources.UpdateCorrelationsCommand{
SourceUID: ds.Uid,
OrgId: cmd.OrgID,
Correlations: ds.Correlations,
Version: cmd.Version,
}
if err = s.SQLStore.UpdateCorrelations(ctx, updateCmd); err == nil {
cmd.Result = datasources.CorrelationDTO{
Target: newCorrelation.Target,
Description: newCorrelation.Description,
Label: newCorrelation.Label,
Version: updateCmd.Version,
}
}
return err
})
}
func (s *Service) GetHTTPClient(ctx context.Context, ds *datasources.DataSource, provider httpclient.Provider) (*http.Client, error) {
transport, err := s.GetHTTPTransport(ctx, ds, provider)
if err != nil {

View File

@ -5,6 +5,7 @@ import (
"errors"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/correlations"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/provisioning/utils"
)
@ -16,6 +17,11 @@ type Store interface {
DeleteDataSource(ctx context.Context, cmd *datasources.DeleteDataSourceCommand) error
}
type CorrelationsStore interface {
DeleteCorrelationsBySourceUID(ctx context.Context, cmd correlations.DeleteCorrelationsBySourceUIDCommand) error
CreateCorrelation(ctx context.Context, cmd correlations.CreateCorrelationCommand) (correlations.CorrelationDTO, error)
}
var (
// ErrInvalidConfigToManyDefault indicates that multiple datasource in the provisioning files
// contains more than one datasource marked as default.
@ -24,24 +30,26 @@ var (
// Provision scans a directory for provisioning config files
// and provisions the datasource in those files.
func Provision(ctx context.Context, configDirectory string, store Store, orgStore utils.OrgStore) error {
dc := newDatasourceProvisioner(log.New("provisioning.datasources"), store, orgStore)
func Provision(ctx context.Context, configDirectory string, store Store, correlationsStore CorrelationsStore, orgStore utils.OrgStore) error {
dc := newDatasourceProvisioner(log.New("provisioning.datasources"), store, correlationsStore, orgStore)
return dc.applyChanges(ctx, configDirectory)
}
// DatasourceProvisioner is responsible for provisioning datasources based on
// configuration read by the `configReader`
type DatasourceProvisioner struct {
log log.Logger
cfgProvider *configReader
store Store
log log.Logger
cfgProvider *configReader
store Store
correlationsStore CorrelationsStore
}
func newDatasourceProvisioner(log log.Logger, store Store, orgStore utils.OrgStore) DatasourceProvisioner {
func newDatasourceProvisioner(log log.Logger, store Store, correlationsStore CorrelationsStore, orgStore utils.OrgStore) DatasourceProvisioner {
return DatasourceProvisioner{
log: log,
cfgProvider: &configReader{log: log, orgStore: orgStore},
store: store,
log: log,
cfgProvider: &configReader{log: log, orgStore: orgStore},
store: store,
correlationsStore: correlationsStore,
}
}
@ -50,6 +58,8 @@ func (dc *DatasourceProvisioner) apply(ctx context.Context, cfg *configs) error
return err
}
correlationsToInsert := make([]correlations.CreateCorrelationCommand, 0)
for _, ds := range cfg.Datasources {
cmd := &datasources.GetDataSourceQuery{OrgId: ds.OrgID, Name: ds.Name}
err := dc.store.GetDataSource(ctx, cmd)
@ -63,15 +73,50 @@ func (dc *DatasourceProvisioner) apply(ctx context.Context, cfg *configs) error
if err := dc.store.AddDataSource(ctx, insertCmd); err != nil {
return err
}
for _, correlation := range ds.Correlations {
if field, ok := correlation.(map[string]interface{}); ok {
correlationsToInsert = append(correlationsToInsert, correlations.CreateCorrelationCommand{
SourceUID: insertCmd.Result.Uid,
TargetUID: field["targetUid"].(string),
Label: field["label"].(string),
Description: field["description"].(string),
OrgId: insertCmd.OrgId,
})
}
}
} else {
updateCmd := createUpdateCommand(ds, cmd.Result.Id)
dc.log.Debug("updating datasource from configuration", "name", updateCmd.Name, "uid", updateCmd.Uid)
if err := dc.store.UpdateDataSource(ctx, updateCmd); err != nil {
return err
}
if len(ds.Correlations) > 0 {
dc.correlationsStore.DeleteCorrelationsBySourceUID(ctx, correlations.DeleteCorrelationsBySourceUIDCommand{
SourceUID: cmd.Result.Uid,
})
}
for _, correlation := range ds.Correlations {
if field, ok := correlation.(map[string]interface{}); ok {
correlationsToInsert = append(correlationsToInsert, correlations.CreateCorrelationCommand{
SourceUID: cmd.Result.Uid,
TargetUID: field["targetUid"].(string),
Label: field["label"].(string),
Description: field["description"].(string),
OrgId: updateCmd.OrgId,
})
}
}
}
}
for _, createCorrelationCmd := range correlationsToInsert {
dc.correlationsStore.CreateCorrelation(ctx, createCorrelationCmd)
}
return nil
}

View File

@ -9,7 +9,6 @@ import (
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/provisioning/values"
"github.com/grafana/grafana/pkg/util"
)
// ConfigVersion is used to figure out which API version a config uses.
@ -215,7 +214,6 @@ func createInsertCommand(ds *upsertDataSourceFromConfig) *datasources.AddDataSou
BasicAuthUser: ds.BasicAuthUser,
WithCredentials: ds.WithCredentials,
IsDefault: ds.IsDefault,
Correlations: makeCorrelations(ds.Correlations),
JsonData: jsonData,
SecureJsonData: ds.SecureJSONData,
ReadOnly: !ds.Editable,
@ -228,34 +226,6 @@ func createInsertCommand(ds *upsertDataSourceFromConfig) *datasources.AddDataSou
return cmd
}
func makeCorrelations(correlations []interface{}) []datasources.Correlation {
ret := make([]datasources.Correlation, 0)
uidMap := make(map[string]bool)
for _, v := range correlations {
if field, ok := v.(map[string]interface{}); ok {
uid := ""
for i := 0; i < 10; i++ {
newUid := util.GenerateShortUID()
if exists := uidMap[newUid]; !exists {
uid = newUid
uidMap[uid] = true
break
}
}
ret = append(ret, datasources.Correlation{
Uid: uid,
Target: field["targetUid"].(string),
Description: field["description"].(string),
Label: field["label"].(string),
})
}
}
return ret
}
func safeUIDFromName(name string) string {
h := sha256.New()
_, _ = h.Write([]byte(name))
@ -285,7 +255,6 @@ func createUpdateCommand(ds *upsertDataSourceFromConfig, id int64) *datasources.
BasicAuthUser: ds.BasicAuthUser,
WithCredentials: ds.WithCredentials,
IsDefault: ds.IsDefault,
Correlations: makeCorrelations(ds.Correlations),
JsonData: jsonData,
SecureJsonData: ds.SecureJSONData,
ReadOnly: !ds.Editable,

View File

@ -10,6 +10,7 @@ import (
plugifaces "github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/alerting"
"github.com/grafana/grafana/pkg/services/correlations"
dashboardservice "github.com/grafana/grafana/pkg/services/dashboards"
datasourceservice "github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/encryption"
@ -29,6 +30,7 @@ func ProvideService(cfg *setting.Cfg, sqlStore *sqlstore.SQLStore, pluginStore p
encryptionService encryption.Internal, notificatonService *notifications.NotificationService,
dashboardProvisioningService dashboardservice.DashboardProvisioningService,
datasourceService datasourceservice.DataSourceService,
correlationsService correlations.Service,
dashboardService dashboardservice.DashboardService,
alertingService *alerting.AlertNotificationService, pluginSettings pluginsettings.Service,
searchService searchV2.SearchService,
@ -47,6 +49,7 @@ func ProvideService(cfg *setting.Cfg, sqlStore *sqlstore.SQLStore, pluginStore p
dashboardProvisioningService: dashboardProvisioningService,
dashboardService: dashboardService,
datasourceService: datasourceService,
correlationsService: correlationsService,
alertingService: alertingService,
pluginsSettings: pluginSettings,
searchService: searchService,
@ -80,7 +83,7 @@ func NewProvisioningServiceImpl() *ProvisioningServiceImpl {
func newProvisioningServiceImpl(
newDashboardProvisioner dashboards.DashboardProvisionerFactory,
provisionNotifiers func(context.Context, string, notifiers.Manager, notifiers.SQLStore, encryption.Internal, *notifications.NotificationService) error,
provisionDatasources func(context.Context, string, datasources.Store, utils.OrgStore) error,
provisionDatasources func(context.Context, string, datasources.Store, datasources.CorrelationsStore, utils.OrgStore) error,
provisionPlugins func(context.Context, string, plugins.Store, plugifaces.Store, pluginsettings.Service) error,
) *ProvisioningServiceImpl {
return &ProvisioningServiceImpl{
@ -103,12 +106,13 @@ type ProvisioningServiceImpl struct {
newDashboardProvisioner dashboards.DashboardProvisionerFactory
dashboardProvisioner dashboards.DashboardProvisioner
provisionNotifiers func(context.Context, string, notifiers.Manager, notifiers.SQLStore, encryption.Internal, *notifications.NotificationService) error
provisionDatasources func(context.Context, string, datasources.Store, utils.OrgStore) error
provisionDatasources func(context.Context, string, datasources.Store, datasources.CorrelationsStore, utils.OrgStore) error
provisionPlugins func(context.Context, string, plugins.Store, plugifaces.Store, pluginsettings.Service) error
mutex sync.Mutex
dashboardProvisioningService dashboardservice.DashboardProvisioningService
dashboardService dashboardservice.DashboardService
datasourceService datasourceservice.DataSourceService
correlationsService correlations.Service
alertingService *alerting.AlertNotificationService
pluginsSettings pluginsettings.Service
searchService searchV2.SearchService
@ -167,7 +171,7 @@ func (ps *ProvisioningServiceImpl) Run(ctx context.Context) error {
func (ps *ProvisioningServiceImpl) ProvisionDatasources(ctx context.Context) error {
datasourcePath := filepath.Join(ps.Cfg.ProvisioningPath, "datasources")
if err := ps.provisionDatasources(ctx, datasourcePath, ps.datasourceService, ps.SQLStore); err != nil {
if err := ps.provisionDatasources(ctx, datasourcePath, ps.datasourceService, ps.correlationsService, ps.SQLStore); err != nil {
err = fmt.Errorf("%v: %w", "Datasource provisioning error", err)
ps.log.Error("Failed to provision data sources", "error", err)
return err

View File

@ -123,6 +123,8 @@ func (ss *SQLStore) DeleteDataSource(ctx context.Context, cmd *datasources.Delet
}
}
// TODO: delete correlations having sourceUID or targetUID = cmd.UID
// Publish data source deletion event
sess.publishAfterCommit(&events.DataSourceDeleted{
Timestamp: time.Now(),
@ -169,7 +171,6 @@ func (ss *SQLStore) AddDataSource(ctx context.Context, cmd *datasources.AddDataS
BasicAuth: cmd.BasicAuth,
BasicAuthUser: cmd.BasicAuthUser,
WithCredentials: cmd.WithCredentials,
Correlations: cmd.Correlations,
JsonData: cmd.JsonData,
SecureJsonData: cmd.EncryptedSecureJsonData,
Created: time.Now(),
@ -290,40 +291,6 @@ func (ss *SQLStore) UpdateDataSource(ctx context.Context, cmd *datasources.Updat
})
}
func (ss *SQLStore) UpdateCorrelations(ctx context.Context, cmd *datasources.UpdateCorrelationsCommand) error {
return ss.WithTransactionalDbSession(ctx, func(sess *DBSession) error {
ds := &datasources.DataSource{
Correlations: cmd.Correlations,
Updated: time.Now(),
Version: cmd.Version + 1,
}
updateSession := sess.Omit("json_data")
if cmd.Version != 0 {
// the reason we allow cmd.version > db.version is make it possible for people to force
// updates to datasources using the datasource.yaml file without knowing exactly what version
// a datasource have in the db.
updateSession = updateSession.Where("uid=? and org_id=? and version < ?", cmd.SourceUID, cmd.OrgId, ds.Version)
} else {
updateSession = updateSession.Where("uid=? and org_id=?", cmd.SourceUID, cmd.OrgId)
}
affected, err := updateSession.Update(ds)
if err != nil {
return err
}
if affected == 0 {
return datasources.ErrDataSourceUpdatingOldVersion
}
cmd.Version = ds.Version
cmd.Result = cmd.Correlations
return err
})
}
func generateNewDatasourceUid(sess *DBSession, orgId int64) (string, error) {
for i := 0; i < 3; i++ {
uid := generateNewUid()

View File

@ -0,0 +1,22 @@
package migrations
import (
. "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
)
func addCorrelationsMigrations(mg *Migrator) {
correlationsV1 := Table{
Name: "correlation",
Columns: []*Column{
{Name: "id", Type: DB_BigInt, Nullable: false, IsPrimaryKey: true, IsAutoIncrement: true},
{Name: "uid", Type: DB_NVarchar, Length: 40, Nullable: false},
{Name: "source_uid", Type: DB_NVarchar, Length: 40, Nullable: false},
// Nullable because in the future we want to have correlations to external resources
{Name: "target_uid", Type: DB_NVarchar, Length: 40, Nullable: true},
{Name: "label", Type: DB_Text, Nullable: false},
{Name: "description", Type: DB_Text, Nullable: false},
},
}
mg.AddMigration("create correlation table v1", NewAddTableMigration(correlationsV1))
}

View File

@ -134,9 +134,4 @@ func addDataSourceMigration(mg *Migrator) {
mg.AddMigration("add unique index datasource_org_id_is_default", NewAddIndexMigration(tableV2, &Index{
Cols: []string{"org_id", "is_default"}}))
// add column correlations
mg.AddMigration("Add correlations column", NewAddColumnMigration(tableV2, &Column{
Name: "correlations", Type: DB_Text, Nullable: true,
}))
}

View File

@ -75,6 +75,8 @@ func (*OSSMigrations) AddMigration(mg *Migrator) {
addQueryHistoryStarMigrations(mg)
addCorrelationsMigrations(mg)
if mg.Cfg != nil && mg.Cfg.IsFeatureToggleEnabled != nil {
if mg.Cfg.IsFeatureToggleEnabled(featuremgmt.FlagDashboardComments) || mg.Cfg.IsFeatureToggleEnabled(featuremgmt.FlagAnnotationComments) {
addCommentGroupMigrations(mg)