diff --git a/pkg/api/dashboard.go b/pkg/api/dashboard.go index 8809c9fccfb..5fe06616847 100644 --- a/pkg/api/dashboard.go +++ b/pkg/api/dashboard.go @@ -22,6 +22,7 @@ import ( "github.com/grafana/grafana/pkg/services/dashboards" "github.com/grafana/grafana/pkg/services/guardian" pref "github.com/grafana/grafana/pkg/services/preference" + "github.com/grafana/grafana/pkg/services/store" "github.com/grafana/grafana/pkg/util" "github.com/grafana/grafana/pkg/web" ) @@ -277,6 +278,16 @@ func (hs *HTTPServer) deleteDashboard(c *models.ReqContext) response.Response { } return response.Error(500, "Failed to delete dashboard", err) } + + if hs.entityEventsService != nil { + if err := hs.entityEventsService.SaveEvent(c.Req.Context(), store.SaveEventCmd{ + EntityId: store.CreateDatabaseEntityId(dash.Uid, dash.OrgId, store.EntityTypeDashboard), + EventType: store.EntityEventTypeDelete, + }); err != nil { + hs.log.Warn("failed to save dashboard entity event", "uid", dash.Uid, "error", err) + } + } + if hs.Live != nil { err := hs.Live.GrafanaScope.Dashboards.DashboardDeleted(c.OrgId, c.ToUserDisplayDTO(), dash.Uid) if err != nil { @@ -362,6 +373,15 @@ func (hs *HTTPServer) postDashboard(c *models.ReqContext, cmd models.SaveDashboa dashboard, err := hs.dashboardService.SaveDashboard(alerting.WithUAEnabled(ctx, hs.Cfg.UnifiedAlerting.IsEnabled()), dashItem, allowUiUpdate) + if dashboard != nil && hs.entityEventsService != nil { + if err := hs.entityEventsService.SaveEvent(ctx, store.SaveEventCmd{ + EntityId: store.CreateDatabaseEntityId(dashboard.Uid, dashboard.OrgId, store.EntityTypeDashboard), + EventType: store.EntityEventTypeUpdate, + }); err != nil { + hs.log.Warn("failed to save dashboard entity event", "uid", dashboard.Uid, "error", err) + } + } + if hs.Live != nil { // Tell everyone listening that the dashboard changed if dashboard == nil { diff --git a/pkg/api/http_server.go b/pkg/api/http_server.go index 0d4e4241744..8d59c483f3a 100644 --- a/pkg/api/http_server.go +++ b/pkg/api/http_server.go @@ -151,6 +151,7 @@ type HTTPServer struct { PluginSettings *pluginSettings.Service AvatarCacheServer *avatar.AvatarCacheServer preferenceService pref.Service + entityEventsService store.EntityEventsService } type ServerOptions struct { @@ -182,7 +183,7 @@ func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routi dashboardProvisioningService dashboards.DashboardProvisioningService, folderService dashboards.FolderService, datasourcePermissionsService permissions.DatasourcePermissionsService, alertNotificationService *alerting.AlertNotificationService, dashboardsnapshotsService *dashboardsnapshots.Service, commentsService *comments.Service, pluginSettings *pluginSettings.Service, - avatarCacheServer *avatar.AvatarCacheServer, preferenceService pref.Service, + avatarCacheServer *avatar.AvatarCacheServer, preferenceService pref.Service, entityEventsService store.EntityEventsService, ) (*HTTPServer, error) { web.Env = cfg.Env m := web.New() @@ -257,6 +258,7 @@ func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routi permissionServices: permissionsServices, AvatarCacheServer: avatarCacheServer, preferenceService: preferenceService, + entityEventsService: entityEventsService, } if hs.Listener != nil { hs.log.Debug("Using provided listener") diff --git a/pkg/plugins/manager/manager_integration_test.go b/pkg/plugins/manager/manager_integration_test.go index cf6eec40fac..a79742404c2 100644 --- a/pkg/plugins/manager/manager_integration_test.go +++ b/pkg/plugins/manager/manager_integration_test.go @@ -86,7 +86,7 @@ func TestPluginManager_int_init(t *testing.T) { pg := postgres.ProvideService(cfg) my := mysql.ProvideService(cfg, hcp) ms := mssql.ProvideService(cfg) - sv2 := searchV2.ProvideService(sqlstore.InitTestDB(t)) + sv2 := searchV2.ProvideService(cfg, sqlstore.InitTestDB(t), nil) graf := grafanads.ProvideService(cfg, sv2, nil) coreRegistry := coreplugin.ProvideCoreRegistry(am, cw, cm, es, grap, idb, lk, otsdb, pr, tmpo, td, pg, my, ms, graf) diff --git a/pkg/server/backgroundsvcs/background_services.go b/pkg/server/backgroundsvcs/background_services.go index 529e7abde09..d891a17c4ca 100644 --- a/pkg/server/backgroundsvcs/background_services.go +++ b/pkg/server/backgroundsvcs/background_services.go @@ -21,6 +21,7 @@ import ( plugindashboardsservice "github.com/grafana/grafana/pkg/services/plugindashboards/service" "github.com/grafana/grafana/pkg/services/provisioning" "github.com/grafana/grafana/pkg/services/rendering" + "github.com/grafana/grafana/pkg/services/searchV2" secretsManager "github.com/grafana/grafana/pkg/services/secrets/manager" "github.com/grafana/grafana/pkg/services/serviceaccounts" "github.com/grafana/grafana/pkg/services/store" @@ -36,7 +37,7 @@ func ProvideBackgroundServiceRegistry( statsCollector *statscollector.Service, grafanaUpdateChecker *updatechecker.GrafanaService, pluginsUpdateChecker *updatechecker.PluginsService, metrics *metrics.InternalMetricsService, secretsService *secretsManager.SecretsService, remoteCache *remotecache.RemoteCache, - thumbnailsService thumbs.Service, StorageService store.StorageService, + thumbnailsService thumbs.Service, StorageService store.StorageService, searchService searchV2.SearchService, entityEventsService store.EntityEventsService, // Need to make sure these are initialized, is there a better place to put them? _ *dashboardsnapshots.Service, _ *alerting.AlertNotificationService, _ serviceaccounts.Service, _ *guardian.Provider, @@ -64,6 +65,8 @@ func ProvideBackgroundServiceRegistry( secretsService, StorageService, thumbnailsService, + searchService, + entityEventsService, ) } diff --git a/pkg/server/wire.go b/pkg/server/wire.go index 4461854bd5b..e0d7cf8ee2b 100644 --- a/pkg/server/wire.go +++ b/pkg/server/wire.go @@ -5,7 +5,6 @@ package server import ( "github.com/google/wire" - sdkhttpclient "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" "github.com/grafana/grafana/pkg/api" @@ -153,6 +152,7 @@ var wireBasicSet = wire.NewSet( postgres.ProvideService, mysql.ProvideService, mssql.ProvideService, + store.ProvideEntityEventsService, httpclientprovider.New, wire.Bind(new(httpclient.Provider), new(*sdkhttpclient.Provider)), serverlock.ProvideService, diff --git a/pkg/services/searchV2/index.go b/pkg/services/searchV2/index.go new file mode 100644 index 00000000000..43571ca0757 --- /dev/null +++ b/pkg/services/searchV2/index.go @@ -0,0 +1,406 @@ +package searchV2 + +import ( + "bytes" + "context" + "fmt" + "strconv" + "strings" + "sync" + "time" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/services/searchV2/extract" + "github.com/grafana/grafana/pkg/services/sqlstore" + "github.com/grafana/grafana/pkg/services/store" +) + +type dashboardLoader interface { + // LoadDashboards returns slice of dashboards. If dashboardUID is empty – then + // implementation must return all dashboards in instance to build an entire + // dashboard index for an organization. If dashboardUID is not empty – then only + // return dashboard with specified UID or empty slice if not found (this is required + // to apply partial update). + LoadDashboards(ctx context.Context, orgID int64, dashboardUID string) ([]dashboard, error) +} + +type eventStore interface { + GetLastEvent(ctx context.Context) (*store.EntityEvent, error) + GetAllEventsAfter(ctx context.Context, id int64) ([]*store.EntityEvent, error) +} + +type dashboardIndex struct { + mu sync.RWMutex + loader dashboardLoader + dashboards map[int64][]dashboard // orgId -> []dashboards + eventStore eventStore + logger log.Logger +} + +type dashboard struct { + id int64 + uid string + isFolder bool + folderID int64 + slug string + created time.Time + updated time.Time + info *extract.DashboardInfo +} + +func newDashboardIndex(dashLoader dashboardLoader, evStore eventStore) *dashboardIndex { + return &dashboardIndex{ + loader: dashLoader, + eventStore: evStore, + dashboards: map[int64][]dashboard{}, + logger: log.New("dashboardIndex"), + } +} + +func (i *dashboardIndex) run(ctx context.Context) error { + fullReIndexTicker := time.NewTicker(5 * time.Minute) + defer fullReIndexTicker.Stop() + + partialUpdateTicker := time.NewTicker(5 * time.Second) + defer partialUpdateTicker.Stop() + + var lastEventID int64 + lastEvent, err := i.eventStore.GetLastEvent(ctx) + if err != nil { + return err + } + if lastEvent != nil { + lastEventID = lastEvent.Id + } + + // Build on start for orgID 1 but keep lazy for others. + _, err = i.getDashboards(ctx, 1) + if err != nil { + return fmt.Errorf("can't build dashboard search index for org ID 1: %w", err) + } + + for { + select { + case <-partialUpdateTicker.C: + lastEventID = i.applyIndexUpdates(ctx, lastEventID) + case <-fullReIndexTicker.C: + started := time.Now() + i.reIndexFromScratch(ctx) + i.logger.Info("Full re-indexing finished", "fullReIndexElapsed", time.Since(started)) + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (i *dashboardIndex) reIndexFromScratch(ctx context.Context) { + i.mu.RLock() + orgIDs := make([]int64, 0, len(i.dashboards)) + for orgID := range i.dashboards { + orgIDs = append(orgIDs, orgID) + } + i.mu.RUnlock() + + for _, orgID := range orgIDs { + started := time.Now() + ctx, cancel := context.WithTimeout(ctx, time.Minute) + dashboards, err := i.loader.LoadDashboards(ctx, orgID, "") + if err != nil { + cancel() + i.logger.Error("Error re-indexing dashboards for organization", "orgId", orgID, "error", err) + continue + } + cancel() + i.logger.Info("Re-indexed dashboards for organization", "orgId", orgID, "orgReIndexElapsed", time.Since(started)) + i.mu.Lock() + i.dashboards[orgID] = dashboards + i.mu.Unlock() + } +} + +func (i *dashboardIndex) applyIndexUpdates(ctx context.Context, lastEventID int64) int64 { + events, err := i.eventStore.GetAllEventsAfter(context.Background(), lastEventID) + if err != nil { + i.logger.Error("can't load events", "error", err) + return lastEventID + } + if len(events) == 0 { + return lastEventID + } + started := time.Now() + for _, e := range events { + i.logger.Debug("processing event", "event", e) + err := i.applyEventOnIndex(ctx, e) + if err != nil { + i.logger.Error("can't apply event", "error", err) + return lastEventID + } + lastEventID = e.Id + } + i.logger.Info("Index updates applied", "indexEventsAppliedElapsed", time.Since(started), "numEvents", len(events)) + return lastEventID +} + +func (i *dashboardIndex) applyEventOnIndex(ctx context.Context, e *store.EntityEvent) error { + if !strings.HasPrefix(e.EntityId, "database/") { + i.logger.Warn("unknown storage", "entityId", e.EntityId) + return nil + } + parts := strings.Split(strings.TrimPrefix(e.EntityId, "database/"), "/") + if len(parts) != 3 { + i.logger.Error("can't parse entityId", "entityId", e.EntityId) + return nil + } + orgIDStr := parts[0] + kind := parts[1] + dashboardUID := parts[2] + if kind != "dashboard" { + i.logger.Error("unknown kind in entityId", "entityId", e.EntityId) + return nil + } + orgID, err := strconv.Atoi(orgIDStr) + if err != nil { + i.logger.Error("can't extract org ID", "entityId", e.EntityId) + return nil + } + return i.applyDashboardEvent(ctx, int64(orgID), dashboardUID, e.EventType) +} + +func (i *dashboardIndex) applyDashboardEvent(ctx context.Context, orgID int64, dashboardUID string, _ store.EntityEventType) error { + i.mu.Lock() + _, ok := i.dashboards[orgID] + if !ok { + // Skip event for org not yet indexed. + i.mu.Unlock() + return nil + } + i.mu.Unlock() + + dbDashboards, err := i.loader.LoadDashboards(ctx, orgID, dashboardUID) + if err != nil { + return err + } + + i.mu.Lock() + defer i.mu.Unlock() + + dashboards, ok := i.dashboards[orgID] + if !ok { + // Skip event for org not yet fully indexed. + return nil + } + + // In the future we can rely on operation types to reduce work here. + if len(dbDashboards) == 0 { + // Delete. + i.dashboards[orgID] = removeDashboard(dashboards, dashboardUID) + } else { + updated := false + for i, d := range dashboards { + if d.uid == dashboardUID { + // Update. + dashboards[i] = dbDashboards[0] + updated = true + break + } + } + if !updated { + // Create. + dashboards = append(dashboards, dbDashboards...) + } + i.dashboards[orgID] = dashboards + } + return nil +} + +func removeDashboard(dashboards []dashboard, dashboardUID string) []dashboard { + k := 0 + for _, d := range dashboards { + if d.uid != dashboardUID { + dashboards[k] = d + k++ + } + } + return dashboards[:k] +} + +func (i *dashboardIndex) getDashboards(ctx context.Context, orgId int64) ([]dashboard, error) { + var dashboards []dashboard + + i.mu.Lock() + defer i.mu.Unlock() + + if cachedDashboards, ok := i.dashboards[orgId]; ok { + dashboards = cachedDashboards + } else { + // Load and parse all dashboards for given orgId. + var err error + dashboards, err = i.loader.LoadDashboards(ctx, orgId, "") + if err != nil { + return nil, err + } + i.dashboards[orgId] = dashboards + } + return dashboards, nil +} + +type sqlDashboardLoader struct { + sql *sqlstore.SQLStore +} + +func (l sqlDashboardLoader) LoadDashboards(ctx context.Context, orgID int64, dashboardUID string) ([]dashboard, error) { + var dashboards []dashboard + + limit := 1 + + if dashboardUID == "" { + limit = 200 + dashboards = make([]dashboard, 0, limit+1) + + // Add the root folder ID (does not exist in SQL). + dashboards = append(dashboards, dashboard{ + id: 0, + isFolder: true, + folderID: 0, + slug: "", + created: time.Now(), + updated: time.Now(), + info: &extract.DashboardInfo{ + ID: 0, + UID: "", + Title: "General", + }, + }) + } + + // key will allow name or uid + lookup, err := loadDatasourceLookup(ctx, orgID, l.sql) + if err != nil { + return dashboards, err + } + + var lastID int64 + + for { + rows := make([]*dashboardQueryResult, 0, limit) + + err = l.sql.WithDbSession(ctx, func(sess *sqlstore.DBSession) error { + sess.Table("dashboard"). + Where("org_id = ?", orgID) + + if lastID > 0 { + sess.Where("id > ?", lastID) + } + + if dashboardUID != "" { + sess.Where("uid = ?", dashboardUID) + } + + sess.Cols("id", "uid", "is_folder", "folder_id", "data", "slug", "created", "updated") + + sess.Limit(limit) + + return sess.Find(&rows) + }) + + if err != nil { + return nil, err + } + + for _, row := range rows { + dashboards = append(dashboards, dashboard{ + id: row.Id, + uid: row.Uid, + isFolder: row.IsFolder, + folderID: row.FolderID, + slug: row.Slug, + created: row.Created, + updated: row.Updated, + info: extract.ReadDashboard(bytes.NewReader(row.Data), lookup), + }) + lastID = row.Id + } + + if len(rows) < limit || dashboardUID != "" { + break + } + } + + return dashboards, err +} + +type dashboardQueryResult struct { + Id int64 + Uid string + IsFolder bool `xorm:"is_folder"` + FolderID int64 `xorm:"folder_id"` + Slug string `xorm:"slug"` + Data []byte + Created time.Time + Updated time.Time +} + +type datasourceQueryResult struct { + UID string `xorm:"uid"` + Type string `xorm:"type"` + Name string `xorm:"name"` + IsDefault bool `xorm:"is_default"` +} + +func loadDatasourceLookup(ctx context.Context, orgID int64, sql *sqlstore.SQLStore) (extract.DatasourceLookup, error) { + byUID := make(map[string]*extract.DataSourceRef, 50) + byName := make(map[string]*extract.DataSourceRef, 50) + var defaultDS *extract.DataSourceRef + + err := sql.WithDbSession(ctx, func(sess *sqlstore.DBSession) error { + rows := make([]*datasourceQueryResult, 0) + sess.Table("data_source"). + Where("org_id = ?", orgID). + Cols("uid", "name", "type", "is_default") + + err := sess.Find(&rows) + if err != nil { + return err + } + + for _, row := range rows { + ds := &extract.DataSourceRef{ + UID: row.UID, + Type: row.Type, + } + byUID[row.UID] = ds + byName[row.Name] = ds + if row.IsDefault { + defaultDS = ds + } + } + + return nil + }) + if err != nil { + return nil, err + } + + // Lookup by UID or name + return func(ref *extract.DataSourceRef) *extract.DataSourceRef { + if ref == nil { + return defaultDS + } + key := "" + if ref.UID != "" { + ds, ok := byUID[ref.UID] + if ok { + return ds + } + key = ref.UID + } + if key == "" { + return defaultDS + } + ds, ok := byUID[key] + if ok { + return ds + } + return byName[key] + }, err +} diff --git a/pkg/services/searchV2/index_test.go b/pkg/services/searchV2/index_test.go new file mode 100644 index 00000000000..addad470c0a --- /dev/null +++ b/pkg/services/searchV2/index_test.go @@ -0,0 +1,94 @@ +package searchV2 + +import ( + "context" + "testing" + + "github.com/grafana/grafana/pkg/services/store" + "github.com/stretchr/testify/require" +) + +type testDashboardLoader struct { + dashboards []dashboard +} + +func (t *testDashboardLoader) LoadDashboards(ctx context.Context, orgID int64, dashboardUID string) ([]dashboard, error) { + return t.dashboards, nil +} + +func TestDashboardIndexCreate(t *testing.T) { + dashboardLoader := &testDashboardLoader{ + dashboards: []dashboard{ + { + uid: "1", + }, + }, + } + index := newDashboardIndex(dashboardLoader, &store.MockEntityEventsService{}) + require.NotNil(t, index) + dashboards, err := index.getDashboards(context.Background(), 1) + require.NoError(t, err) + require.Len(t, dashboards, 1) + + dashboardLoader.dashboards = []dashboard{ + { + uid: "2", + }, + } + err = index.applyDashboardEvent(context.Background(), 1, "2", "") + require.NoError(t, err) + dashboards, err = index.getDashboards(context.Background(), 1) + require.NoError(t, err) + require.Len(t, dashboards, 2) +} + +func TestDashboardIndexUpdate(t *testing.T) { + dashboardLoader := &testDashboardLoader{ + dashboards: []dashboard{ + { + uid: "1", + slug: "test", + }, + }, + } + index := newDashboardIndex(dashboardLoader, nil) + require.NotNil(t, index) + dashboards, err := index.getDashboards(context.Background(), 1) + require.NoError(t, err) + require.Len(t, dashboards, 1) + + dashboardLoader.dashboards = []dashboard{ + { + uid: "1", + slug: "updated", + }, + } + err = index.applyDashboardEvent(context.Background(), 1, "1", "") + require.NoError(t, err) + dashboards, err = index.getDashboards(context.Background(), 1) + require.NoError(t, err) + require.Len(t, dashboards, 1) + require.Equal(t, "updated", dashboards[0].slug) +} + +func TestDashboardIndexDelete(t *testing.T) { + dashboardLoader := &testDashboardLoader{ + dashboards: []dashboard{ + { + uid: "1", + }, + }, + } + index := newDashboardIndex(dashboardLoader, nil) + require.NotNil(t, index) + dashboards, err := index.getDashboards(context.Background(), 1) + require.NoError(t, err) + require.Len(t, dashboards, 1) + + dashboardLoader.dashboards = []dashboard{} + err = index.applyDashboardEvent(context.Background(), 1, "1", "") + require.NoError(t, err) + dashboards, err = index.getDashboards(context.Background(), 1) + require.NoError(t, err) + require.Len(t, dashboards, 0) +} diff --git a/pkg/services/searchV2/service.go b/pkg/services/searchV2/service.go index 3998b0ac224..c08ff51d2a3 100644 --- a/pkg/services/searchV2/service.go +++ b/pkg/services/searchV2/service.go @@ -1,49 +1,62 @@ package searchV2 import ( - "bytes" "context" "encoding/json" "fmt" "strconv" - "time" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/registry" + "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/grafana/grafana/pkg/services/searchV2/extract" + "github.com/grafana/grafana/pkg/services/sqlstore" + "github.com/grafana/grafana/pkg/services/store" + "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" - "github.com/grafana/grafana/pkg/models" - "github.com/grafana/grafana/pkg/services/searchV2/extract" - "github.com/grafana/grafana/pkg/services/sqlstore" ) type StandardSearchService struct { + registry.BackgroundService + + cfg *setting.Cfg sql *sqlstore.SQLStore auth FutureAuthService // eventually injected from elsewhere + + logger log.Logger + dashboardIndex *dashboardIndex } -func ProvideService(sql *sqlstore.SQLStore) SearchService { +func ProvideService(cfg *setting.Cfg, sql *sqlstore.SQLStore, entityEventStore store.EntityEventsService) SearchService { return &StandardSearchService{ + cfg: cfg, sql: sql, auth: &simpleSQLAuthService{ sql: sql, }, + dashboardIndex: newDashboardIndex(&sqlDashboardLoader{sql: sql}, entityEventStore), + logger: log.New("searchV2"), } } -type dashMeta struct { - id int64 - is_folder bool - folder_id int64 - slug string - created time.Time - updated time.Time - dash *extract.DashboardInfo +func (s *StandardSearchService) IsDisabled() bool { + if s.cfg == nil { + return true + } + return !s.cfg.IsFeatureToggleEnabled(featuremgmt.FlagPanelTitleSearch) } -func (s *StandardSearchService) DoDashboardQuery(ctx context.Context, user *backend.User, orgId int64, query DashboardQuery) *backend.DataResponse { +func (s *StandardSearchService) Run(ctx context.Context) error { + return s.dashboardIndex.run(ctx) +} + +func (s *StandardSearchService) DoDashboardQuery(ctx context.Context, user *backend.User, orgId int64, _ DashboardQuery) *backend.DataResponse { rsp := &backend.DataResponse{} - // Load and parse all dashboards for given orgId - dash, err := loadDashboards(ctx, orgId, s.sql) + dash, err := s.dashboardIndex.getDashboards(ctx, orgId) if err != nil { rsp.Error = err return rsp @@ -58,13 +71,13 @@ func (s *StandardSearchService) DoDashboardQuery(ctx context.Context, user *back err = s.sql.GetSignedInUser(ctx, getSignedInUserQuery) if err != nil { - fmt.Printf("error while retrieving user %s\n", err) + s.logger.Error("Error while retrieving user", "error", err) rsp.Error = fmt.Errorf("auth error") return rsp } if getSignedInUserQuery.Result == nil { - fmt.Printf("no user %s", user.Email) + s.logger.Error("No user found", "email", user.Email) rsp.Error = fmt.Errorf("auth error") return rsp } @@ -80,153 +93,22 @@ func (s *StandardSearchService) DoDashboardQuery(ctx context.Context, user *back return rsp } -func (s *StandardSearchService) applyAuthFilter(user *models.SignedInUser, dash []dashMeta) ([]dashMeta, error) { +func (s *StandardSearchService) applyAuthFilter(user *models.SignedInUser, dash []dashboard) ([]dashboard, error) { filter, err := s.auth.GetDashboardReadFilter(user) if err != nil { return nil, err } // create a list of all viewable dashboards for this user - res := make([]dashMeta, 0, len(dash)) + res := make([]dashboard, 0, len(dash)) for _, dash := range dash { - if filter(dash.dash.UID) || (dash.is_folder && dash.dash.UID == "") { // include the "General" folder + if filter(dash.info.UID) || (dash.isFolder && dash.info.UID == "") { // include the "General" folder res = append(res, dash) } } return res, nil } -type dashDataQueryResult struct { - Id int64 - IsFolder bool `xorm:"is_folder"` - FolderID int64 `xorm:"folder_id"` - Slug string `xorm:"slug"` - Data []byte - Created time.Time - Updated time.Time -} - -type dsQueryResult struct { - UID string `xorm:"uid"` - Type string `xorm:"type"` - Name string `xorm:"name"` - IsDefault bool `xorm:"is_default"` -} - -func loadDashboards(ctx context.Context, orgID int64, sql *sqlstore.SQLStore) ([]dashMeta, error) { - meta := make([]dashMeta, 0, 200) - - // Add the root folder ID (does not exist in SQL) - meta = append(meta, dashMeta{ - id: 0, - is_folder: true, - folder_id: 0, - slug: "", - created: time.Now(), - updated: time.Now(), - dash: &extract.DashboardInfo{ - ID: 0, - UID: "", - Title: "General", - }, - }) - - // key will allow name or uid - lookup, err := loadDatasoureLookup(ctx, orgID, sql) - if err != nil { - return meta, err - } - - err = sql.WithDbSession(ctx, func(sess *sqlstore.DBSession) error { - rows := make([]*dashDataQueryResult, 0) - - sess.Table("dashboard"). - Where("org_id = ?", orgID). - Cols("id", "is_folder", "folder_id", "data", "slug", "created", "updated") - - err := sess.Find(&rows) - if err != nil { - return err - } - - for _, row := range rows { - dash := extract.ReadDashboard(bytes.NewReader(row.Data), lookup) - - meta = append(meta, dashMeta{ - id: row.Id, - is_folder: row.IsFolder, - folder_id: row.FolderID, - slug: row.Slug, - created: row.Created, - updated: row.Updated, - dash: dash, - }) - } - - return nil - }) - - return meta, err -} - -func loadDatasoureLookup(ctx context.Context, orgID int64, sql *sqlstore.SQLStore) (extract.DatasourceLookup, error) { - byUID := make(map[string]*extract.DataSourceRef, 50) - byName := make(map[string]*extract.DataSourceRef, 50) - var defaultDS *extract.DataSourceRef - - err := sql.WithDbSession(ctx, func(sess *sqlstore.DBSession) error { - rows := make([]*dsQueryResult, 0) - sess.Table("data_source"). - Where("org_id = ?", orgID). - Cols("uid", "name", "type", "is_default") - - err := sess.Find(&rows) - if err != nil { - return err - } - - for _, row := range rows { - ds := &extract.DataSourceRef{ - UID: row.UID, - Type: row.Type, - } - byUID[row.UID] = ds - byName[row.Name] = ds - if row.IsDefault { - defaultDS = ds - } - } - - return nil - }) - if err != nil { - return nil, err - } - - // Lookup by UID or name - return func(ref *extract.DataSourceRef) *extract.DataSourceRef { - if ref == nil { - return defaultDS - } - key := "" - if ref.UID != "" { - ds, ok := byUID[ref.UID] - if ok { - return ds - } - key = ref.UID - } - if key == "" { - return defaultDS - } - ds, ok := byUID[key] - if ok { - return ds - } - return byName[key] - }, err -} - type simpleCounter struct { values map[string]int64 } @@ -250,7 +132,7 @@ func (c *simpleCounter) toFrame(name string) *data.Frame { } // UGLY... but helpful for now -func metaToFrame(meta []dashMeta) data.Frames { +func metaToFrame(meta []dashboard) data.Frames { folderID := data.NewFieldFromFieldType(data.FieldTypeInt64, 0) folderUID := data.NewFieldFromFieldType(data.FieldTypeString, 0) folderName := data.NewFieldFromFieldType(data.FieldTypeString, 0) @@ -324,43 +206,43 @@ func metaToFrame(meta []dashMeta) data.Frames { folderCounter := make(map[int64]int64, 20) for _, row := range meta { - if row.is_folder { + if row.isFolder { folderID.Append(row.id) - folderUID.Append(row.dash.UID) - folderName.Append(row.dash.Title) + folderUID.Append(row.info.UID) + folderName.Append(row.info.Title) folderDashCount.Append(int64(0)) // filled in later continue } dashID.Append(row.id) - dashUID.Append(row.dash.UID) - dashFolderID.Append(row.folder_id) - dashName.Append(row.dash.Title) - dashDescr.Append(row.dash.Title) - dashSchemaVersion.Append(row.dash.SchemaVersion) + dashUID.Append(row.info.UID) + dashFolderID.Append(row.folderID) + dashName.Append(row.info.Title) + dashDescr.Append(row.info.Title) + dashSchemaVersion.Append(row.info.SchemaVersion) dashCreated.Append(row.created) dashUpdated.Append(row.updated) // Increment the folder counter - fcount, ok := folderCounter[row.folder_id] + fcount, ok := folderCounter[row.folderID] if !ok { fcount = 0 } - folderCounter[row.folder_id] = fcount + 1 + folderCounter[row.folderID] = fcount + 1 - url := fmt.Sprintf("/d/%s/%s", row.dash.UID, row.slug) + url := fmt.Sprintf("/d/%s/%s", row.info.UID, row.slug) dashURL.Append(url) // stats - schemaVersionCounter.add(strconv.FormatInt(row.dash.SchemaVersion, 10)) + schemaVersionCounter.add(strconv.FormatInt(row.info.SchemaVersion, 10)) - dashTags.Append(toJSONString(row.dash.Tags)) - dashPanelCount.Append(int64(len(row.dash.Panels))) - dashVarCount.Append(int64(len(row.dash.TemplateVars))) - dashDSList.Append(dsAsJSONString(row.dash.Datasource)) + dashTags.Append(toJSONString(row.info.Tags)) + dashPanelCount.Append(int64(len(row.info.Panels))) + dashVarCount.Append(int64(len(row.info.TemplateVars))) + dashDSList.Append(dsAsJSONString(row.info.Datasource)) // Row for each panel - for _, panel := range row.dash.Panels { + for _, panel := range row.info.Panels { panelDashID.Append(row.id) panelID.Append(panel.ID) panelName.Append(panel.Title) diff --git a/pkg/services/searchV2/stub.go b/pkg/services/searchV2/stub.go index c1ba96fd393..b02e34695ea 100644 --- a/pkg/services/searchV2/stub.go +++ b/pkg/services/searchV2/stub.go @@ -28,3 +28,7 @@ func (s *stubSearchService) DoDashboardQuery(ctx context.Context, user *backend. return rsp } + +func (s *stubSearchService) Run(_ context.Context) error { + return nil +} diff --git a/pkg/services/searchV2/types.go b/pkg/services/searchV2/types.go index 1730a6f4b1a..12fddfb6d3e 100644 --- a/pkg/services/searchV2/types.go +++ b/pkg/services/searchV2/types.go @@ -3,6 +3,8 @@ package searchV2 import ( "context" + "github.com/grafana/grafana/pkg/registry" + "github.com/grafana/grafana-plugin-sdk-go/backend" ) @@ -11,5 +13,6 @@ type DashboardQuery struct { } type SearchService interface { + registry.BackgroundService DoDashboardQuery(ctx context.Context, user *backend.User, orgId int64, query DashboardQuery) *backend.DataResponse } diff --git a/pkg/services/sqlstore/migrations/entity_events_mig.go b/pkg/services/sqlstore/migrations/entity_events_mig.go new file mode 100644 index 00000000000..c3b61e58239 --- /dev/null +++ b/pkg/services/sqlstore/migrations/entity_events_mig.go @@ -0,0 +1,18 @@ +package migrations + +import . "github.com/grafana/grafana/pkg/services/sqlstore/migrator" + +func addEntityEventsTableMigration(mg *Migrator) { + entityEventsTable := Table{ + Name: "entity_event", + Columns: []*Column{ + {Name: "id", Type: DB_BigInt, Nullable: false, IsPrimaryKey: true, IsAutoIncrement: true}, + {Name: "entity_id", Type: DB_NVarchar, Length: 1024, Nullable: false}, + {Name: "event_type", Type: DB_NVarchar, Length: 8, Nullable: false}, + {Name: "created", Type: DB_BigInt, Nullable: false}, + }, + Indices: []*Index{}, + } + + mg.AddMigration("create entity_events table", NewAddTableMigration(entityEventsTable)) +} diff --git a/pkg/services/sqlstore/migrations/migrations.go b/pkg/services/sqlstore/migrations/migrations.go index 8877988e1ab..4df98b3f495 100644 --- a/pkg/services/sqlstore/migrations/migrations.go +++ b/pkg/services/sqlstore/migrations/migrations.go @@ -87,6 +87,8 @@ func (*OSSMigrations) AddMigration(mg *Migrator) { addCommentMigrations(mg) } } + + addEntityEventsTableMigration(mg) } func addMigrationLogMigrations(mg *Migrator) { diff --git a/pkg/services/sqlstore/sqlstore.go b/pkg/services/sqlstore/sqlstore.go index f4833d90235..a4cc75cb657 100644 --- a/pkg/services/sqlstore/sqlstore.go +++ b/pkg/services/sqlstore/sqlstore.go @@ -438,6 +438,7 @@ type InitTestDBOpt struct { var featuresEnabledDuringTests = []string{ featuremgmt.FlagDashboardPreviews, featuremgmt.FlagDashboardComments, + featuremgmt.FlagPanelTitleSearch, } // InitTestDBWithMigration initializes the test DB given custom migrations. diff --git a/pkg/services/store/entity_events.go b/pkg/services/store/entity_events.go new file mode 100644 index 00000000000..a0c8a45bf12 --- /dev/null +++ b/pkg/services/store/entity_events.go @@ -0,0 +1,179 @@ +package store + +import ( + "context" + "fmt" + "time" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/registry" + "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/grafana/grafana/pkg/services/sqlstore" + "github.com/grafana/grafana/pkg/setting" +) + +type EntityEventType string + +const ( + EntityEventTypeDelete EntityEventType = "delete" + EntityEventTypeCreate EntityEventType = "create" + EntityEventTypeUpdate EntityEventType = "update" +) + +type EntityType string + +const ( + EntityTypeDashboard EntityType = "dashboard" +) + +// CreateDatabaseEntityId creates entityId for entities stored in the existing SQL tables +func CreateDatabaseEntityId(internalId interface{}, orgId int64, entityType EntityType) string { + var internalIdAsString string + switch id := internalId.(type) { + case string: + internalIdAsString = id + default: + internalIdAsString = fmt.Sprintf("%#v", internalId) + } + + return fmt.Sprintf("database/%d/%s/%s", orgId, entityType, internalIdAsString) +} + +type EntityEvent struct { + Id int64 + EventType EntityEventType + EntityId string + Created int64 +} + +type SaveEventCmd struct { + EntityId string + EventType EntityEventType +} + +// EntityEventsService is a temporary solution to support change notifications in an HA setup +// With this service each system can query for any events that have happened since a fixed time +//go:generate mockery --name EntityEventsService --structname MockEntityEventsService --inpackage --filename entity_events_mock.go +type EntityEventsService interface { + registry.BackgroundService + registry.CanBeDisabled + SaveEvent(ctx context.Context, cmd SaveEventCmd) error + GetLastEvent(ctx context.Context) (*EntityEvent, error) + GetAllEventsAfter(ctx context.Context, id int64) ([]*EntityEvent, error) + + deleteEventsOlderThan(ctx context.Context, duration time.Duration) error +} + +func ProvideEntityEventsService(cfg *setting.Cfg, sqlStore *sqlstore.SQLStore, features featuremgmt.FeatureToggles) EntityEventsService { + if !features.IsEnabled(featuremgmt.FlagPanelTitleSearch) { + return &dummyEntityEventsService{} + } + + return &entityEventService{ + sql: sqlStore, + features: features, + log: log.New("entity-events"), + } +} + +type entityEventService struct { + sql *sqlstore.SQLStore + log log.Logger + features featuremgmt.FeatureToggles +} + +func (e *entityEventService) SaveEvent(ctx context.Context, cmd SaveEventCmd) error { + return e.sql.WithDbSession(ctx, func(sess *sqlstore.DBSession) error { + _, err := sess.Insert(&EntityEvent{ + EventType: cmd.EventType, + EntityId: cmd.EntityId, + Created: time.Now().Unix(), + }) + return err + }) +} + +func (e *entityEventService) GetLastEvent(ctx context.Context) (*EntityEvent, error) { + var entityEvent *EntityEvent + err := e.sql.WithDbSession(ctx, func(sess *sqlstore.DBSession) error { + bean := &EntityEvent{} + found, err := sess.OrderBy("id desc").Get(bean) + if found { + entityEvent = bean + } + return err + }) + + return entityEvent, err +} + +func (e *entityEventService) GetAllEventsAfter(ctx context.Context, id int64) ([]*EntityEvent, error) { + var evs = make([]*EntityEvent, 0) + err := e.sql.WithDbSession(ctx, func(sess *sqlstore.DBSession) error { + return sess.OrderBy("id asc").Where("id > ?", id).Find(&evs) + }) + + return evs, err +} + +func (e *entityEventService) deleteEventsOlderThan(ctx context.Context, duration time.Duration) error { + return e.sql.WithDbSession(ctx, func(sess *sqlstore.DBSession) error { + maxCreated := time.Now().Add(-duration) + deletedCount, err := sess.Where("created < ?", maxCreated.Unix()).Delete(&EntityEvent{}) + e.log.Info("deleting old events", "count", deletedCount, "maxCreated", maxCreated) + return err + }) +} + +func (e *entityEventService) IsDisabled() bool { + return false +} + +func (e *entityEventService) Run(ctx context.Context) error { + clean := time.NewTicker(1 * time.Hour) + + for { + select { + case <-clean.C: + go func() { + err := e.deleteEventsOlderThan(context.Background(), 24*time.Hour) + if err != nil { + e.log.Info("failed to delete old entity events", "error", err) + } + }() + case <-ctx.Done(): + e.log.Debug("Grafana is shutting down - stopping entity events service") + clean.Stop() + return nil + } + } +} + +type dummyEntityEventsService struct { +} + +func (d dummyEntityEventsService) Run(ctx context.Context) error { + return nil +} + +func (d dummyEntityEventsService) IsDisabled() bool { + return false +} + +func (d dummyEntityEventsService) SaveEvent(ctx context.Context, cmd SaveEventCmd) error { + return nil +} + +func (d dummyEntityEventsService) GetLastEvent(ctx context.Context) (*EntityEvent, error) { + return nil, nil +} + +func (d dummyEntityEventsService) GetAllEventsAfter(ctx context.Context, id int64) ([]*EntityEvent, error) { + return make([]*EntityEvent, 0), nil +} + +func (d dummyEntityEventsService) deleteEventsOlderThan(ctx context.Context, duration time.Duration) error { + return nil +} + +var _ EntityEventsService = &dummyEntityEventsService{} diff --git a/pkg/services/store/entity_events_mock.go b/pkg/services/store/entity_events_mock.go new file mode 100644 index 00000000000..4360f8921a1 --- /dev/null +++ b/pkg/services/store/entity_events_mock.go @@ -0,0 +1,117 @@ +// Code generated by mockery v2.10.6. DO NOT EDIT. + +package store + +import ( + context "context" + time "time" + + mock "github.com/stretchr/testify/mock" +) + +// MockEntityEventsService is an autogenerated mock type for the EntityEventsService type +type MockEntityEventsService struct { + mock.Mock +} + +// GetAllEventsAfter provides a mock function with given fields: ctx, id +func (_m *MockEntityEventsService) GetAllEventsAfter(ctx context.Context, id int64) ([]*EntityEvent, error) { + ret := _m.Called(ctx, id) + + var r0 []*EntityEvent + if rf, ok := ret.Get(0).(func(context.Context, int64) []*EntityEvent); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*EntityEvent) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetLastEvent provides a mock function with given fields: ctx +func (_m *MockEntityEventsService) GetLastEvent(ctx context.Context) (*EntityEvent, error) { + ret := _m.Called(ctx) + + var r0 *EntityEvent + if rf, ok := ret.Get(0).(func(context.Context) *EntityEvent); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*EntityEvent) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// IsDisabled provides a mock function with given fields: +func (_m *MockEntityEventsService) IsDisabled() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// Run provides a mock function with given fields: ctx +func (_m *MockEntityEventsService) Run(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SaveEvent provides a mock function with given fields: ctx, cmd +func (_m *MockEntityEventsService) SaveEvent(ctx context.Context, cmd SaveEventCmd) error { + ret := _m.Called(ctx, cmd) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, SaveEventCmd) error); ok { + r0 = rf(ctx, cmd) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// deleteEventsOlderThan provides a mock function with given fields: ctx, duration +func (_m *MockEntityEventsService) deleteEventsOlderThan(ctx context.Context, duration time.Duration) error { + ret := _m.Called(ctx, duration) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, time.Duration) error); ok { + r0 = rf(ctx, duration) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/pkg/services/store/entity_events_test.go b/pkg/services/store/entity_events_test.go new file mode 100644 index 00000000000..7f6ef9c99ca --- /dev/null +++ b/pkg/services/store/entity_events_test.go @@ -0,0 +1,182 @@ +//go:build integration +// +build integration + +package store + +import ( + "context" + "testing" + "time" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/services/sqlstore" + "github.com/stretchr/testify/require" +) + +func TestEntityEventsService(t *testing.T) { + var ctx context.Context + var service EntityEventsService + + setup := func() { + service = &entityEventService{ + sql: sqlstore.InitTestDB(t), + log: log.New("entity-event-test"), + } + ctx = context.Background() + } + + t.Run("Should insert an entity event", func(t *testing.T) { + setup() + + err := service.SaveEvent(ctx, SaveEventCmd{ + EntityId: "database/dash/1", + EventType: EntityEventTypeCreate, + }) + require.NoError(t, err) + }) + + t.Run("Should retrieve nil entity if database is empty", func(t *testing.T) { + setup() + + ev, err := service.GetLastEvent(ctx) + require.NoError(t, err) + require.Nil(t, ev) + }) + + t.Run("Should retrieve last entity event", func(t *testing.T) { + setup() + lastEventEntityId := "database/dash/1" + + err := service.SaveEvent(ctx, SaveEventCmd{ + EntityId: "database/dash/3", + EventType: EntityEventTypeCreate, + }) + require.NoError(t, err) + err = service.SaveEvent(ctx, SaveEventCmd{ + EntityId: "database/dash/2", + EventType: EntityEventTypeCreate, + }) + require.NoError(t, err) + err = service.SaveEvent(ctx, SaveEventCmd{ + EntityId: lastEventEntityId, + EventType: EntityEventTypeCreate, + }) + require.NoError(t, err) + + lastEv, err := service.GetLastEvent(ctx) + require.NoError(t, err) + require.Equal(t, lastEventEntityId, lastEv.EntityId) + }) + + t.Run("Should retrieve sorted events after an id", func(t *testing.T) { + setup() + lastEventEntityId := "database/dash/1" + + err := service.SaveEvent(ctx, SaveEventCmd{ + EntityId: "database/dash/3", + EventType: EntityEventTypeCreate, + }) + require.NoError(t, err) + firstEv, err := service.GetLastEvent(ctx) + firstEvId := firstEv.Id + + err = service.SaveEvent(ctx, SaveEventCmd{ + EntityId: "database/dash/2", + EventType: EntityEventTypeCreate, + }) + require.NoError(t, err) + err = service.SaveEvent(ctx, SaveEventCmd{ + EntityId: lastEventEntityId, + EventType: EntityEventTypeCreate, + }) + require.NoError(t, err) + + evs, err := service.GetAllEventsAfter(ctx, firstEvId) + require.NoError(t, err) + require.Len(t, evs, 2) + require.Equal(t, evs[0].EntityId, "database/dash/2") + require.Equal(t, evs[1].EntityId, lastEventEntityId) + }) + + t.Run("Should delete old events", func(t *testing.T) { + setup() + _ = service.SaveEvent(ctx, SaveEventCmd{ + EntityId: "database/dash/3", + EventType: EntityEventTypeCreate, + }) + _ = service.SaveEvent(ctx, SaveEventCmd{ + EntityId: "database/dash/2", + EventType: EntityEventTypeCreate, + }) + _ = service.SaveEvent(ctx, SaveEventCmd{ + EntityId: "database/dash/1", + EventType: EntityEventTypeCreate, + }) + + evs, err := service.GetAllEventsAfter(ctx, 0) + require.NoError(t, err) + require.Len(t, evs, 3) + + err = service.deleteEventsOlderThan(ctx, 24*time.Hour) + require.NoError(t, err) + + // did not delete any events + evs, err = service.GetAllEventsAfter(ctx, 0) + require.NoError(t, err) + require.Len(t, evs, 3) + + time.Sleep(2 * time.Second) + err = service.deleteEventsOlderThan(ctx, 1*time.Second) + require.NoError(t, err) + + // deleted all events + evs, err = service.GetAllEventsAfter(ctx, 0) + require.NoError(t, err) + require.Len(t, evs, 0) + }) +} + +func TestCreateDatabaseEntityId(t *testing.T) { + tests := []struct { + name string + entityType EntityType + orgId int64 + internalId interface{} + expected string + }{ + { + name: "int64 internal id", + entityType: EntityTypeDashboard, + orgId: 10, + internalId: int64(45), + expected: "database/10/dashboard/45", + }, + { + name: "big-ish int64 internal id", + entityType: EntityTypeDashboard, + orgId: 10, + internalId: int64(12412421), + expected: "database/10/dashboard/12412421", + }, + { + name: "int internal id", + entityType: EntityTypeDashboard, + orgId: 10, + internalId: int(1244), + expected: "database/10/dashboard/1244", + }, + { + name: "string internal id", + entityType: EntityTypeDashboard, + orgId: 10, + internalId: "string-internal-id", + expected: "database/10/dashboard/string-internal-id", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.expected, CreateDatabaseEntityId(tt.internalId, tt.orgId, tt.entityType)) + }) + } +}