mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Search: in-memory index (#47709)
* #45498: add entity events table
* #45498: add entity events service
* #45498: hook up entity events service to http server
* #45498: use `dashboards.id` rather than `uid` and `org_id` in grn
* Update pkg/services/entityevents/service.go
Co-authored-by: Ryan McKinley <ryantxu@gmail.com>
* #45498: move entityeventsservice to services/store
* #45498: add null check
* #45498: rename
* #45498: fix comment
* #45498: switch grn back to uid
* Search: listen for updates (#47719)
* #45498: wire entity event service with searchv2
* load last event id before building index for org 1
* fix service init in integration tests
* depend on required subset of event store methods
* Update pkg/services/sqlstore/migrations/entity_events_mig.go
Co-authored-by: Alexander Emelin <frvzmb@gmail.com>
* #45498: pointer receiver
* #45498: mockery!
* #45498: add entity events service to background services
* dashboard query pagination, allow queries while re-indexing
* log level cleanups, use rlock, add comments
* fix lint, check feature toggle in search v2 service
* use unix time for event created column
* add missing changes for created column
* fix integration tests init
* log re-index execution times on info level
* #45498: fix entityEventsService tests
* #45498: save events on dashboard delete
* use camel case for log labels
* formatting
* #45498: rename grn to entityid
* #45498: add `IsDisabled` to entityEventsService
* #45498: remove feature flag from migration
* better context usage, fix capacity, comments/cleanups
* replace print with logger
* Revert "#45498: remove feature flag from migration"
This reverts commit ed23968898
.
* revert:revert:revert conditional feature flag
Co-authored-by: Ryan McKinley <ryantxu@gmail.com>
Co-authored-by: Alexander Emelin <frvzmb@gmail.com>
This commit is contained in:
parent
8764040fe3
commit
25e153e4e7
@ -22,6 +22,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/dashboards"
|
||||
"github.com/grafana/grafana/pkg/services/guardian"
|
||||
pref "github.com/grafana/grafana/pkg/services/preference"
|
||||
"github.com/grafana/grafana/pkg/services/store"
|
||||
"github.com/grafana/grafana/pkg/util"
|
||||
"github.com/grafana/grafana/pkg/web"
|
||||
)
|
||||
@ -277,6 +278,16 @@ func (hs *HTTPServer) deleteDashboard(c *models.ReqContext) response.Response {
|
||||
}
|
||||
return response.Error(500, "Failed to delete dashboard", err)
|
||||
}
|
||||
|
||||
if hs.entityEventsService != nil {
|
||||
if err := hs.entityEventsService.SaveEvent(c.Req.Context(), store.SaveEventCmd{
|
||||
EntityId: store.CreateDatabaseEntityId(dash.Uid, dash.OrgId, store.EntityTypeDashboard),
|
||||
EventType: store.EntityEventTypeDelete,
|
||||
}); err != nil {
|
||||
hs.log.Warn("failed to save dashboard entity event", "uid", dash.Uid, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
if hs.Live != nil {
|
||||
err := hs.Live.GrafanaScope.Dashboards.DashboardDeleted(c.OrgId, c.ToUserDisplayDTO(), dash.Uid)
|
||||
if err != nil {
|
||||
@ -362,6 +373,15 @@ func (hs *HTTPServer) postDashboard(c *models.ReqContext, cmd models.SaveDashboa
|
||||
|
||||
dashboard, err := hs.dashboardService.SaveDashboard(alerting.WithUAEnabled(ctx, hs.Cfg.UnifiedAlerting.IsEnabled()), dashItem, allowUiUpdate)
|
||||
|
||||
if dashboard != nil && hs.entityEventsService != nil {
|
||||
if err := hs.entityEventsService.SaveEvent(ctx, store.SaveEventCmd{
|
||||
EntityId: store.CreateDatabaseEntityId(dashboard.Uid, dashboard.OrgId, store.EntityTypeDashboard),
|
||||
EventType: store.EntityEventTypeUpdate,
|
||||
}); err != nil {
|
||||
hs.log.Warn("failed to save dashboard entity event", "uid", dashboard.Uid, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
if hs.Live != nil {
|
||||
// Tell everyone listening that the dashboard changed
|
||||
if dashboard == nil {
|
||||
|
@ -151,6 +151,7 @@ type HTTPServer struct {
|
||||
PluginSettings *pluginSettings.Service
|
||||
AvatarCacheServer *avatar.AvatarCacheServer
|
||||
preferenceService pref.Service
|
||||
entityEventsService store.EntityEventsService
|
||||
}
|
||||
|
||||
type ServerOptions struct {
|
||||
@ -182,7 +183,7 @@ func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routi
|
||||
dashboardProvisioningService dashboards.DashboardProvisioningService, folderService dashboards.FolderService,
|
||||
datasourcePermissionsService permissions.DatasourcePermissionsService, alertNotificationService *alerting.AlertNotificationService,
|
||||
dashboardsnapshotsService *dashboardsnapshots.Service, commentsService *comments.Service, pluginSettings *pluginSettings.Service,
|
||||
avatarCacheServer *avatar.AvatarCacheServer, preferenceService pref.Service,
|
||||
avatarCacheServer *avatar.AvatarCacheServer, preferenceService pref.Service, entityEventsService store.EntityEventsService,
|
||||
) (*HTTPServer, error) {
|
||||
web.Env = cfg.Env
|
||||
m := web.New()
|
||||
@ -257,6 +258,7 @@ func ProvideHTTPServer(opts ServerOptions, cfg *setting.Cfg, routeRegister routi
|
||||
permissionServices: permissionsServices,
|
||||
AvatarCacheServer: avatarCacheServer,
|
||||
preferenceService: preferenceService,
|
||||
entityEventsService: entityEventsService,
|
||||
}
|
||||
if hs.Listener != nil {
|
||||
hs.log.Debug("Using provided listener")
|
||||
|
@ -86,7 +86,7 @@ func TestPluginManager_int_init(t *testing.T) {
|
||||
pg := postgres.ProvideService(cfg)
|
||||
my := mysql.ProvideService(cfg, hcp)
|
||||
ms := mssql.ProvideService(cfg)
|
||||
sv2 := searchV2.ProvideService(sqlstore.InitTestDB(t))
|
||||
sv2 := searchV2.ProvideService(cfg, sqlstore.InitTestDB(t), nil)
|
||||
graf := grafanads.ProvideService(cfg, sv2, nil)
|
||||
|
||||
coreRegistry := coreplugin.ProvideCoreRegistry(am, cw, cm, es, grap, idb, lk, otsdb, pr, tmpo, td, pg, my, ms, graf)
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
plugindashboardsservice "github.com/grafana/grafana/pkg/services/plugindashboards/service"
|
||||
"github.com/grafana/grafana/pkg/services/provisioning"
|
||||
"github.com/grafana/grafana/pkg/services/rendering"
|
||||
"github.com/grafana/grafana/pkg/services/searchV2"
|
||||
secretsManager "github.com/grafana/grafana/pkg/services/secrets/manager"
|
||||
"github.com/grafana/grafana/pkg/services/serviceaccounts"
|
||||
"github.com/grafana/grafana/pkg/services/store"
|
||||
@ -36,7 +37,7 @@ func ProvideBackgroundServiceRegistry(
|
||||
statsCollector *statscollector.Service, grafanaUpdateChecker *updatechecker.GrafanaService,
|
||||
pluginsUpdateChecker *updatechecker.PluginsService, metrics *metrics.InternalMetricsService,
|
||||
secretsService *secretsManager.SecretsService, remoteCache *remotecache.RemoteCache,
|
||||
thumbnailsService thumbs.Service, StorageService store.StorageService,
|
||||
thumbnailsService thumbs.Service, StorageService store.StorageService, searchService searchV2.SearchService, entityEventsService store.EntityEventsService,
|
||||
// Need to make sure these are initialized, is there a better place to put them?
|
||||
_ *dashboardsnapshots.Service, _ *alerting.AlertNotificationService,
|
||||
_ serviceaccounts.Service, _ *guardian.Provider,
|
||||
@ -64,6 +65,8 @@ func ProvideBackgroundServiceRegistry(
|
||||
secretsService,
|
||||
StorageService,
|
||||
thumbnailsService,
|
||||
searchService,
|
||||
entityEventsService,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -5,7 +5,6 @@ package server
|
||||
|
||||
import (
|
||||
"github.com/google/wire"
|
||||
|
||||
sdkhttpclient "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
||||
|
||||
"github.com/grafana/grafana/pkg/api"
|
||||
@ -153,6 +152,7 @@ var wireBasicSet = wire.NewSet(
|
||||
postgres.ProvideService,
|
||||
mysql.ProvideService,
|
||||
mssql.ProvideService,
|
||||
store.ProvideEntityEventsService,
|
||||
httpclientprovider.New,
|
||||
wire.Bind(new(httpclient.Provider), new(*sdkhttpclient.Provider)),
|
||||
serverlock.ProvideService,
|
||||
|
406
pkg/services/searchV2/index.go
Normal file
406
pkg/services/searchV2/index.go
Normal file
@ -0,0 +1,406 @@
|
||||
package searchV2
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/searchV2/extract"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
"github.com/grafana/grafana/pkg/services/store"
|
||||
)
|
||||
|
||||
type dashboardLoader interface {
|
||||
// LoadDashboards returns slice of dashboards. If dashboardUID is empty – then
|
||||
// implementation must return all dashboards in instance to build an entire
|
||||
// dashboard index for an organization. If dashboardUID is not empty – then only
|
||||
// return dashboard with specified UID or empty slice if not found (this is required
|
||||
// to apply partial update).
|
||||
LoadDashboards(ctx context.Context, orgID int64, dashboardUID string) ([]dashboard, error)
|
||||
}
|
||||
|
||||
type eventStore interface {
|
||||
GetLastEvent(ctx context.Context) (*store.EntityEvent, error)
|
||||
GetAllEventsAfter(ctx context.Context, id int64) ([]*store.EntityEvent, error)
|
||||
}
|
||||
|
||||
type dashboardIndex struct {
|
||||
mu sync.RWMutex
|
||||
loader dashboardLoader
|
||||
dashboards map[int64][]dashboard // orgId -> []dashboards
|
||||
eventStore eventStore
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
type dashboard struct {
|
||||
id int64
|
||||
uid string
|
||||
isFolder bool
|
||||
folderID int64
|
||||
slug string
|
||||
created time.Time
|
||||
updated time.Time
|
||||
info *extract.DashboardInfo
|
||||
}
|
||||
|
||||
func newDashboardIndex(dashLoader dashboardLoader, evStore eventStore) *dashboardIndex {
|
||||
return &dashboardIndex{
|
||||
loader: dashLoader,
|
||||
eventStore: evStore,
|
||||
dashboards: map[int64][]dashboard{},
|
||||
logger: log.New("dashboardIndex"),
|
||||
}
|
||||
}
|
||||
|
||||
func (i *dashboardIndex) run(ctx context.Context) error {
|
||||
fullReIndexTicker := time.NewTicker(5 * time.Minute)
|
||||
defer fullReIndexTicker.Stop()
|
||||
|
||||
partialUpdateTicker := time.NewTicker(5 * time.Second)
|
||||
defer partialUpdateTicker.Stop()
|
||||
|
||||
var lastEventID int64
|
||||
lastEvent, err := i.eventStore.GetLastEvent(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if lastEvent != nil {
|
||||
lastEventID = lastEvent.Id
|
||||
}
|
||||
|
||||
// Build on start for orgID 1 but keep lazy for others.
|
||||
_, err = i.getDashboards(ctx, 1)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't build dashboard search index for org ID 1: %w", err)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-partialUpdateTicker.C:
|
||||
lastEventID = i.applyIndexUpdates(ctx, lastEventID)
|
||||
case <-fullReIndexTicker.C:
|
||||
started := time.Now()
|
||||
i.reIndexFromScratch(ctx)
|
||||
i.logger.Info("Full re-indexing finished", "fullReIndexElapsed", time.Since(started))
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (i *dashboardIndex) reIndexFromScratch(ctx context.Context) {
|
||||
i.mu.RLock()
|
||||
orgIDs := make([]int64, 0, len(i.dashboards))
|
||||
for orgID := range i.dashboards {
|
||||
orgIDs = append(orgIDs, orgID)
|
||||
}
|
||||
i.mu.RUnlock()
|
||||
|
||||
for _, orgID := range orgIDs {
|
||||
started := time.Now()
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Minute)
|
||||
dashboards, err := i.loader.LoadDashboards(ctx, orgID, "")
|
||||
if err != nil {
|
||||
cancel()
|
||||
i.logger.Error("Error re-indexing dashboards for organization", "orgId", orgID, "error", err)
|
||||
continue
|
||||
}
|
||||
cancel()
|
||||
i.logger.Info("Re-indexed dashboards for organization", "orgId", orgID, "orgReIndexElapsed", time.Since(started))
|
||||
i.mu.Lock()
|
||||
i.dashboards[orgID] = dashboards
|
||||
i.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (i *dashboardIndex) applyIndexUpdates(ctx context.Context, lastEventID int64) int64 {
|
||||
events, err := i.eventStore.GetAllEventsAfter(context.Background(), lastEventID)
|
||||
if err != nil {
|
||||
i.logger.Error("can't load events", "error", err)
|
||||
return lastEventID
|
||||
}
|
||||
if len(events) == 0 {
|
||||
return lastEventID
|
||||
}
|
||||
started := time.Now()
|
||||
for _, e := range events {
|
||||
i.logger.Debug("processing event", "event", e)
|
||||
err := i.applyEventOnIndex(ctx, e)
|
||||
if err != nil {
|
||||
i.logger.Error("can't apply event", "error", err)
|
||||
return lastEventID
|
||||
}
|
||||
lastEventID = e.Id
|
||||
}
|
||||
i.logger.Info("Index updates applied", "indexEventsAppliedElapsed", time.Since(started), "numEvents", len(events))
|
||||
return lastEventID
|
||||
}
|
||||
|
||||
func (i *dashboardIndex) applyEventOnIndex(ctx context.Context, e *store.EntityEvent) error {
|
||||
if !strings.HasPrefix(e.EntityId, "database/") {
|
||||
i.logger.Warn("unknown storage", "entityId", e.EntityId)
|
||||
return nil
|
||||
}
|
||||
parts := strings.Split(strings.TrimPrefix(e.EntityId, "database/"), "/")
|
||||
if len(parts) != 3 {
|
||||
i.logger.Error("can't parse entityId", "entityId", e.EntityId)
|
||||
return nil
|
||||
}
|
||||
orgIDStr := parts[0]
|
||||
kind := parts[1]
|
||||
dashboardUID := parts[2]
|
||||
if kind != "dashboard" {
|
||||
i.logger.Error("unknown kind in entityId", "entityId", e.EntityId)
|
||||
return nil
|
||||
}
|
||||
orgID, err := strconv.Atoi(orgIDStr)
|
||||
if err != nil {
|
||||
i.logger.Error("can't extract org ID", "entityId", e.EntityId)
|
||||
return nil
|
||||
}
|
||||
return i.applyDashboardEvent(ctx, int64(orgID), dashboardUID, e.EventType)
|
||||
}
|
||||
|
||||
func (i *dashboardIndex) applyDashboardEvent(ctx context.Context, orgID int64, dashboardUID string, _ store.EntityEventType) error {
|
||||
i.mu.Lock()
|
||||
_, ok := i.dashboards[orgID]
|
||||
if !ok {
|
||||
// Skip event for org not yet indexed.
|
||||
i.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
i.mu.Unlock()
|
||||
|
||||
dbDashboards, err := i.loader.LoadDashboards(ctx, orgID, dashboardUID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
i.mu.Lock()
|
||||
defer i.mu.Unlock()
|
||||
|
||||
dashboards, ok := i.dashboards[orgID]
|
||||
if !ok {
|
||||
// Skip event for org not yet fully indexed.
|
||||
return nil
|
||||
}
|
||||
|
||||
// In the future we can rely on operation types to reduce work here.
|
||||
if len(dbDashboards) == 0 {
|
||||
// Delete.
|
||||
i.dashboards[orgID] = removeDashboard(dashboards, dashboardUID)
|
||||
} else {
|
||||
updated := false
|
||||
for i, d := range dashboards {
|
||||
if d.uid == dashboardUID {
|
||||
// Update.
|
||||
dashboards[i] = dbDashboards[0]
|
||||
updated = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !updated {
|
||||
// Create.
|
||||
dashboards = append(dashboards, dbDashboards...)
|
||||
}
|
||||
i.dashboards[orgID] = dashboards
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func removeDashboard(dashboards []dashboard, dashboardUID string) []dashboard {
|
||||
k := 0
|
||||
for _, d := range dashboards {
|
||||
if d.uid != dashboardUID {
|
||||
dashboards[k] = d
|
||||
k++
|
||||
}
|
||||
}
|
||||
return dashboards[:k]
|
||||
}
|
||||
|
||||
func (i *dashboardIndex) getDashboards(ctx context.Context, orgId int64) ([]dashboard, error) {
|
||||
var dashboards []dashboard
|
||||
|
||||
i.mu.Lock()
|
||||
defer i.mu.Unlock()
|
||||
|
||||
if cachedDashboards, ok := i.dashboards[orgId]; ok {
|
||||
dashboards = cachedDashboards
|
||||
} else {
|
||||
// Load and parse all dashboards for given orgId.
|
||||
var err error
|
||||
dashboards, err = i.loader.LoadDashboards(ctx, orgId, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
i.dashboards[orgId] = dashboards
|
||||
}
|
||||
return dashboards, nil
|
||||
}
|
||||
|
||||
type sqlDashboardLoader struct {
|
||||
sql *sqlstore.SQLStore
|
||||
}
|
||||
|
||||
func (l sqlDashboardLoader) LoadDashboards(ctx context.Context, orgID int64, dashboardUID string) ([]dashboard, error) {
|
||||
var dashboards []dashboard
|
||||
|
||||
limit := 1
|
||||
|
||||
if dashboardUID == "" {
|
||||
limit = 200
|
||||
dashboards = make([]dashboard, 0, limit+1)
|
||||
|
||||
// Add the root folder ID (does not exist in SQL).
|
||||
dashboards = append(dashboards, dashboard{
|
||||
id: 0,
|
||||
isFolder: true,
|
||||
folderID: 0,
|
||||
slug: "",
|
||||
created: time.Now(),
|
||||
updated: time.Now(),
|
||||
info: &extract.DashboardInfo{
|
||||
ID: 0,
|
||||
UID: "",
|
||||
Title: "General",
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// key will allow name or uid
|
||||
lookup, err := loadDatasourceLookup(ctx, orgID, l.sql)
|
||||
if err != nil {
|
||||
return dashboards, err
|
||||
}
|
||||
|
||||
var lastID int64
|
||||
|
||||
for {
|
||||
rows := make([]*dashboardQueryResult, 0, limit)
|
||||
|
||||
err = l.sql.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
sess.Table("dashboard").
|
||||
Where("org_id = ?", orgID)
|
||||
|
||||
if lastID > 0 {
|
||||
sess.Where("id > ?", lastID)
|
||||
}
|
||||
|
||||
if dashboardUID != "" {
|
||||
sess.Where("uid = ?", dashboardUID)
|
||||
}
|
||||
|
||||
sess.Cols("id", "uid", "is_folder", "folder_id", "data", "slug", "created", "updated")
|
||||
|
||||
sess.Limit(limit)
|
||||
|
||||
return sess.Find(&rows)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, row := range rows {
|
||||
dashboards = append(dashboards, dashboard{
|
||||
id: row.Id,
|
||||
uid: row.Uid,
|
||||
isFolder: row.IsFolder,
|
||||
folderID: row.FolderID,
|
||||
slug: row.Slug,
|
||||
created: row.Created,
|
||||
updated: row.Updated,
|
||||
info: extract.ReadDashboard(bytes.NewReader(row.Data), lookup),
|
||||
})
|
||||
lastID = row.Id
|
||||
}
|
||||
|
||||
if len(rows) < limit || dashboardUID != "" {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return dashboards, err
|
||||
}
|
||||
|
||||
type dashboardQueryResult struct {
|
||||
Id int64
|
||||
Uid string
|
||||
IsFolder bool `xorm:"is_folder"`
|
||||
FolderID int64 `xorm:"folder_id"`
|
||||
Slug string `xorm:"slug"`
|
||||
Data []byte
|
||||
Created time.Time
|
||||
Updated time.Time
|
||||
}
|
||||
|
||||
type datasourceQueryResult struct {
|
||||
UID string `xorm:"uid"`
|
||||
Type string `xorm:"type"`
|
||||
Name string `xorm:"name"`
|
||||
IsDefault bool `xorm:"is_default"`
|
||||
}
|
||||
|
||||
func loadDatasourceLookup(ctx context.Context, orgID int64, sql *sqlstore.SQLStore) (extract.DatasourceLookup, error) {
|
||||
byUID := make(map[string]*extract.DataSourceRef, 50)
|
||||
byName := make(map[string]*extract.DataSourceRef, 50)
|
||||
var defaultDS *extract.DataSourceRef
|
||||
|
||||
err := sql.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
rows := make([]*datasourceQueryResult, 0)
|
||||
sess.Table("data_source").
|
||||
Where("org_id = ?", orgID).
|
||||
Cols("uid", "name", "type", "is_default")
|
||||
|
||||
err := sess.Find(&rows)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, row := range rows {
|
||||
ds := &extract.DataSourceRef{
|
||||
UID: row.UID,
|
||||
Type: row.Type,
|
||||
}
|
||||
byUID[row.UID] = ds
|
||||
byName[row.Name] = ds
|
||||
if row.IsDefault {
|
||||
defaultDS = ds
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Lookup by UID or name
|
||||
return func(ref *extract.DataSourceRef) *extract.DataSourceRef {
|
||||
if ref == nil {
|
||||
return defaultDS
|
||||
}
|
||||
key := ""
|
||||
if ref.UID != "" {
|
||||
ds, ok := byUID[ref.UID]
|
||||
if ok {
|
||||
return ds
|
||||
}
|
||||
key = ref.UID
|
||||
}
|
||||
if key == "" {
|
||||
return defaultDS
|
||||
}
|
||||
ds, ok := byUID[key]
|
||||
if ok {
|
||||
return ds
|
||||
}
|
||||
return byName[key]
|
||||
}, err
|
||||
}
|
94
pkg/services/searchV2/index_test.go
Normal file
94
pkg/services/searchV2/index_test.go
Normal file
@ -0,0 +1,94 @@
|
||||
package searchV2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/store"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type testDashboardLoader struct {
|
||||
dashboards []dashboard
|
||||
}
|
||||
|
||||
func (t *testDashboardLoader) LoadDashboards(ctx context.Context, orgID int64, dashboardUID string) ([]dashboard, error) {
|
||||
return t.dashboards, nil
|
||||
}
|
||||
|
||||
func TestDashboardIndexCreate(t *testing.T) {
|
||||
dashboardLoader := &testDashboardLoader{
|
||||
dashboards: []dashboard{
|
||||
{
|
||||
uid: "1",
|
||||
},
|
||||
},
|
||||
}
|
||||
index := newDashboardIndex(dashboardLoader, &store.MockEntityEventsService{})
|
||||
require.NotNil(t, index)
|
||||
dashboards, err := index.getDashboards(context.Background(), 1)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, dashboards, 1)
|
||||
|
||||
dashboardLoader.dashboards = []dashboard{
|
||||
{
|
||||
uid: "2",
|
||||
},
|
||||
}
|
||||
err = index.applyDashboardEvent(context.Background(), 1, "2", "")
|
||||
require.NoError(t, err)
|
||||
dashboards, err = index.getDashboards(context.Background(), 1)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, dashboards, 2)
|
||||
}
|
||||
|
||||
func TestDashboardIndexUpdate(t *testing.T) {
|
||||
dashboardLoader := &testDashboardLoader{
|
||||
dashboards: []dashboard{
|
||||
{
|
||||
uid: "1",
|
||||
slug: "test",
|
||||
},
|
||||
},
|
||||
}
|
||||
index := newDashboardIndex(dashboardLoader, nil)
|
||||
require.NotNil(t, index)
|
||||
dashboards, err := index.getDashboards(context.Background(), 1)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, dashboards, 1)
|
||||
|
||||
dashboardLoader.dashboards = []dashboard{
|
||||
{
|
||||
uid: "1",
|
||||
slug: "updated",
|
||||
},
|
||||
}
|
||||
err = index.applyDashboardEvent(context.Background(), 1, "1", "")
|
||||
require.NoError(t, err)
|
||||
dashboards, err = index.getDashboards(context.Background(), 1)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, dashboards, 1)
|
||||
require.Equal(t, "updated", dashboards[0].slug)
|
||||
}
|
||||
|
||||
func TestDashboardIndexDelete(t *testing.T) {
|
||||
dashboardLoader := &testDashboardLoader{
|
||||
dashboards: []dashboard{
|
||||
{
|
||||
uid: "1",
|
||||
},
|
||||
},
|
||||
}
|
||||
index := newDashboardIndex(dashboardLoader, nil)
|
||||
require.NotNil(t, index)
|
||||
dashboards, err := index.getDashboards(context.Background(), 1)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, dashboards, 1)
|
||||
|
||||
dashboardLoader.dashboards = []dashboard{}
|
||||
err = index.applyDashboardEvent(context.Background(), 1, "1", "")
|
||||
require.NoError(t, err)
|
||||
dashboards, err = index.getDashboards(context.Background(), 1)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, dashboards, 0)
|
||||
}
|
@ -1,49 +1,62 @@
|
||||
package searchV2
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/searchV2/extract"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
"github.com/grafana/grafana/pkg/services/store"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/services/searchV2/extract"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
)
|
||||
|
||||
type StandardSearchService struct {
|
||||
registry.BackgroundService
|
||||
|
||||
cfg *setting.Cfg
|
||||
sql *sqlstore.SQLStore
|
||||
auth FutureAuthService // eventually injected from elsewhere
|
||||
|
||||
logger log.Logger
|
||||
dashboardIndex *dashboardIndex
|
||||
}
|
||||
|
||||
func ProvideService(sql *sqlstore.SQLStore) SearchService {
|
||||
func ProvideService(cfg *setting.Cfg, sql *sqlstore.SQLStore, entityEventStore store.EntityEventsService) SearchService {
|
||||
return &StandardSearchService{
|
||||
cfg: cfg,
|
||||
sql: sql,
|
||||
auth: &simpleSQLAuthService{
|
||||
sql: sql,
|
||||
},
|
||||
dashboardIndex: newDashboardIndex(&sqlDashboardLoader{sql: sql}, entityEventStore),
|
||||
logger: log.New("searchV2"),
|
||||
}
|
||||
}
|
||||
|
||||
type dashMeta struct {
|
||||
id int64
|
||||
is_folder bool
|
||||
folder_id int64
|
||||
slug string
|
||||
created time.Time
|
||||
updated time.Time
|
||||
dash *extract.DashboardInfo
|
||||
func (s *StandardSearchService) IsDisabled() bool {
|
||||
if s.cfg == nil {
|
||||
return true
|
||||
}
|
||||
return !s.cfg.IsFeatureToggleEnabled(featuremgmt.FlagPanelTitleSearch)
|
||||
}
|
||||
|
||||
func (s *StandardSearchService) DoDashboardQuery(ctx context.Context, user *backend.User, orgId int64, query DashboardQuery) *backend.DataResponse {
|
||||
func (s *StandardSearchService) Run(ctx context.Context) error {
|
||||
return s.dashboardIndex.run(ctx)
|
||||
}
|
||||
|
||||
func (s *StandardSearchService) DoDashboardQuery(ctx context.Context, user *backend.User, orgId int64, _ DashboardQuery) *backend.DataResponse {
|
||||
rsp := &backend.DataResponse{}
|
||||
|
||||
// Load and parse all dashboards for given orgId
|
||||
dash, err := loadDashboards(ctx, orgId, s.sql)
|
||||
dash, err := s.dashboardIndex.getDashboards(ctx, orgId)
|
||||
if err != nil {
|
||||
rsp.Error = err
|
||||
return rsp
|
||||
@ -58,13 +71,13 @@ func (s *StandardSearchService) DoDashboardQuery(ctx context.Context, user *back
|
||||
|
||||
err = s.sql.GetSignedInUser(ctx, getSignedInUserQuery)
|
||||
if err != nil {
|
||||
fmt.Printf("error while retrieving user %s\n", err)
|
||||
s.logger.Error("Error while retrieving user", "error", err)
|
||||
rsp.Error = fmt.Errorf("auth error")
|
||||
return rsp
|
||||
}
|
||||
|
||||
if getSignedInUserQuery.Result == nil {
|
||||
fmt.Printf("no user %s", user.Email)
|
||||
s.logger.Error("No user found", "email", user.Email)
|
||||
rsp.Error = fmt.Errorf("auth error")
|
||||
return rsp
|
||||
}
|
||||
@ -80,153 +93,22 @@ func (s *StandardSearchService) DoDashboardQuery(ctx context.Context, user *back
|
||||
return rsp
|
||||
}
|
||||
|
||||
func (s *StandardSearchService) applyAuthFilter(user *models.SignedInUser, dash []dashMeta) ([]dashMeta, error) {
|
||||
func (s *StandardSearchService) applyAuthFilter(user *models.SignedInUser, dash []dashboard) ([]dashboard, error) {
|
||||
filter, err := s.auth.GetDashboardReadFilter(user)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// create a list of all viewable dashboards for this user
|
||||
res := make([]dashMeta, 0, len(dash))
|
||||
res := make([]dashboard, 0, len(dash))
|
||||
for _, dash := range dash {
|
||||
if filter(dash.dash.UID) || (dash.is_folder && dash.dash.UID == "") { // include the "General" folder
|
||||
if filter(dash.info.UID) || (dash.isFolder && dash.info.UID == "") { // include the "General" folder
|
||||
res = append(res, dash)
|
||||
}
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
type dashDataQueryResult struct {
|
||||
Id int64
|
||||
IsFolder bool `xorm:"is_folder"`
|
||||
FolderID int64 `xorm:"folder_id"`
|
||||
Slug string `xorm:"slug"`
|
||||
Data []byte
|
||||
Created time.Time
|
||||
Updated time.Time
|
||||
}
|
||||
|
||||
type dsQueryResult struct {
|
||||
UID string `xorm:"uid"`
|
||||
Type string `xorm:"type"`
|
||||
Name string `xorm:"name"`
|
||||
IsDefault bool `xorm:"is_default"`
|
||||
}
|
||||
|
||||
func loadDashboards(ctx context.Context, orgID int64, sql *sqlstore.SQLStore) ([]dashMeta, error) {
|
||||
meta := make([]dashMeta, 0, 200)
|
||||
|
||||
// Add the root folder ID (does not exist in SQL)
|
||||
meta = append(meta, dashMeta{
|
||||
id: 0,
|
||||
is_folder: true,
|
||||
folder_id: 0,
|
||||
slug: "",
|
||||
created: time.Now(),
|
||||
updated: time.Now(),
|
||||
dash: &extract.DashboardInfo{
|
||||
ID: 0,
|
||||
UID: "",
|
||||
Title: "General",
|
||||
},
|
||||
})
|
||||
|
||||
// key will allow name or uid
|
||||
lookup, err := loadDatasoureLookup(ctx, orgID, sql)
|
||||
if err != nil {
|
||||
return meta, err
|
||||
}
|
||||
|
||||
err = sql.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
rows := make([]*dashDataQueryResult, 0)
|
||||
|
||||
sess.Table("dashboard").
|
||||
Where("org_id = ?", orgID).
|
||||
Cols("id", "is_folder", "folder_id", "data", "slug", "created", "updated")
|
||||
|
||||
err := sess.Find(&rows)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, row := range rows {
|
||||
dash := extract.ReadDashboard(bytes.NewReader(row.Data), lookup)
|
||||
|
||||
meta = append(meta, dashMeta{
|
||||
id: row.Id,
|
||||
is_folder: row.IsFolder,
|
||||
folder_id: row.FolderID,
|
||||
slug: row.Slug,
|
||||
created: row.Created,
|
||||
updated: row.Updated,
|
||||
dash: dash,
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return meta, err
|
||||
}
|
||||
|
||||
func loadDatasoureLookup(ctx context.Context, orgID int64, sql *sqlstore.SQLStore) (extract.DatasourceLookup, error) {
|
||||
byUID := make(map[string]*extract.DataSourceRef, 50)
|
||||
byName := make(map[string]*extract.DataSourceRef, 50)
|
||||
var defaultDS *extract.DataSourceRef
|
||||
|
||||
err := sql.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
rows := make([]*dsQueryResult, 0)
|
||||
sess.Table("data_source").
|
||||
Where("org_id = ?", orgID).
|
||||
Cols("uid", "name", "type", "is_default")
|
||||
|
||||
err := sess.Find(&rows)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, row := range rows {
|
||||
ds := &extract.DataSourceRef{
|
||||
UID: row.UID,
|
||||
Type: row.Type,
|
||||
}
|
||||
byUID[row.UID] = ds
|
||||
byName[row.Name] = ds
|
||||
if row.IsDefault {
|
||||
defaultDS = ds
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Lookup by UID or name
|
||||
return func(ref *extract.DataSourceRef) *extract.DataSourceRef {
|
||||
if ref == nil {
|
||||
return defaultDS
|
||||
}
|
||||
key := ""
|
||||
if ref.UID != "" {
|
||||
ds, ok := byUID[ref.UID]
|
||||
if ok {
|
||||
return ds
|
||||
}
|
||||
key = ref.UID
|
||||
}
|
||||
if key == "" {
|
||||
return defaultDS
|
||||
}
|
||||
ds, ok := byUID[key]
|
||||
if ok {
|
||||
return ds
|
||||
}
|
||||
return byName[key]
|
||||
}, err
|
||||
}
|
||||
|
||||
type simpleCounter struct {
|
||||
values map[string]int64
|
||||
}
|
||||
@ -250,7 +132,7 @@ func (c *simpleCounter) toFrame(name string) *data.Frame {
|
||||
}
|
||||
|
||||
// UGLY... but helpful for now
|
||||
func metaToFrame(meta []dashMeta) data.Frames {
|
||||
func metaToFrame(meta []dashboard) data.Frames {
|
||||
folderID := data.NewFieldFromFieldType(data.FieldTypeInt64, 0)
|
||||
folderUID := data.NewFieldFromFieldType(data.FieldTypeString, 0)
|
||||
folderName := data.NewFieldFromFieldType(data.FieldTypeString, 0)
|
||||
@ -324,43 +206,43 @@ func metaToFrame(meta []dashMeta) data.Frames {
|
||||
folderCounter := make(map[int64]int64, 20)
|
||||
|
||||
for _, row := range meta {
|
||||
if row.is_folder {
|
||||
if row.isFolder {
|
||||
folderID.Append(row.id)
|
||||
folderUID.Append(row.dash.UID)
|
||||
folderName.Append(row.dash.Title)
|
||||
folderUID.Append(row.info.UID)
|
||||
folderName.Append(row.info.Title)
|
||||
folderDashCount.Append(int64(0)) // filled in later
|
||||
continue
|
||||
}
|
||||
|
||||
dashID.Append(row.id)
|
||||
dashUID.Append(row.dash.UID)
|
||||
dashFolderID.Append(row.folder_id)
|
||||
dashName.Append(row.dash.Title)
|
||||
dashDescr.Append(row.dash.Title)
|
||||
dashSchemaVersion.Append(row.dash.SchemaVersion)
|
||||
dashUID.Append(row.info.UID)
|
||||
dashFolderID.Append(row.folderID)
|
||||
dashName.Append(row.info.Title)
|
||||
dashDescr.Append(row.info.Title)
|
||||
dashSchemaVersion.Append(row.info.SchemaVersion)
|
||||
dashCreated.Append(row.created)
|
||||
dashUpdated.Append(row.updated)
|
||||
|
||||
// Increment the folder counter
|
||||
fcount, ok := folderCounter[row.folder_id]
|
||||
fcount, ok := folderCounter[row.folderID]
|
||||
if !ok {
|
||||
fcount = 0
|
||||
}
|
||||
folderCounter[row.folder_id] = fcount + 1
|
||||
folderCounter[row.folderID] = fcount + 1
|
||||
|
||||
url := fmt.Sprintf("/d/%s/%s", row.dash.UID, row.slug)
|
||||
url := fmt.Sprintf("/d/%s/%s", row.info.UID, row.slug)
|
||||
dashURL.Append(url)
|
||||
|
||||
// stats
|
||||
schemaVersionCounter.add(strconv.FormatInt(row.dash.SchemaVersion, 10))
|
||||
schemaVersionCounter.add(strconv.FormatInt(row.info.SchemaVersion, 10))
|
||||
|
||||
dashTags.Append(toJSONString(row.dash.Tags))
|
||||
dashPanelCount.Append(int64(len(row.dash.Panels)))
|
||||
dashVarCount.Append(int64(len(row.dash.TemplateVars)))
|
||||
dashDSList.Append(dsAsJSONString(row.dash.Datasource))
|
||||
dashTags.Append(toJSONString(row.info.Tags))
|
||||
dashPanelCount.Append(int64(len(row.info.Panels)))
|
||||
dashVarCount.Append(int64(len(row.info.TemplateVars)))
|
||||
dashDSList.Append(dsAsJSONString(row.info.Datasource))
|
||||
|
||||
// Row for each panel
|
||||
for _, panel := range row.dash.Panels {
|
||||
for _, panel := range row.info.Panels {
|
||||
panelDashID.Append(row.id)
|
||||
panelID.Append(panel.ID)
|
||||
panelName.Append(panel.Title)
|
||||
|
@ -28,3 +28,7 @@ func (s *stubSearchService) DoDashboardQuery(ctx context.Context, user *backend.
|
||||
|
||||
return rsp
|
||||
}
|
||||
|
||||
func (s *stubSearchService) Run(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
@ -3,6 +3,8 @@ package searchV2
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
)
|
||||
|
||||
@ -11,5 +13,6 @@ type DashboardQuery struct {
|
||||
}
|
||||
|
||||
type SearchService interface {
|
||||
registry.BackgroundService
|
||||
DoDashboardQuery(ctx context.Context, user *backend.User, orgId int64, query DashboardQuery) *backend.DataResponse
|
||||
}
|
||||
|
18
pkg/services/sqlstore/migrations/entity_events_mig.go
Normal file
18
pkg/services/sqlstore/migrations/entity_events_mig.go
Normal file
@ -0,0 +1,18 @@
|
||||
package migrations
|
||||
|
||||
import . "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
|
||||
|
||||
func addEntityEventsTableMigration(mg *Migrator) {
|
||||
entityEventsTable := Table{
|
||||
Name: "entity_event",
|
||||
Columns: []*Column{
|
||||
{Name: "id", Type: DB_BigInt, Nullable: false, IsPrimaryKey: true, IsAutoIncrement: true},
|
||||
{Name: "entity_id", Type: DB_NVarchar, Length: 1024, Nullable: false},
|
||||
{Name: "event_type", Type: DB_NVarchar, Length: 8, Nullable: false},
|
||||
{Name: "created", Type: DB_BigInt, Nullable: false},
|
||||
},
|
||||
Indices: []*Index{},
|
||||
}
|
||||
|
||||
mg.AddMigration("create entity_events table", NewAddTableMigration(entityEventsTable))
|
||||
}
|
@ -87,6 +87,8 @@ func (*OSSMigrations) AddMigration(mg *Migrator) {
|
||||
addCommentMigrations(mg)
|
||||
}
|
||||
}
|
||||
|
||||
addEntityEventsTableMigration(mg)
|
||||
}
|
||||
|
||||
func addMigrationLogMigrations(mg *Migrator) {
|
||||
|
@ -438,6 +438,7 @@ type InitTestDBOpt struct {
|
||||
var featuresEnabledDuringTests = []string{
|
||||
featuremgmt.FlagDashboardPreviews,
|
||||
featuremgmt.FlagDashboardComments,
|
||||
featuremgmt.FlagPanelTitleSearch,
|
||||
}
|
||||
|
||||
// InitTestDBWithMigration initializes the test DB given custom migrations.
|
||||
|
179
pkg/services/store/entity_events.go
Normal file
179
pkg/services/store/entity_events.go
Normal file
@ -0,0 +1,179 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
type EntityEventType string
|
||||
|
||||
const (
|
||||
EntityEventTypeDelete EntityEventType = "delete"
|
||||
EntityEventTypeCreate EntityEventType = "create"
|
||||
EntityEventTypeUpdate EntityEventType = "update"
|
||||
)
|
||||
|
||||
type EntityType string
|
||||
|
||||
const (
|
||||
EntityTypeDashboard EntityType = "dashboard"
|
||||
)
|
||||
|
||||
// CreateDatabaseEntityId creates entityId for entities stored in the existing SQL tables
|
||||
func CreateDatabaseEntityId(internalId interface{}, orgId int64, entityType EntityType) string {
|
||||
var internalIdAsString string
|
||||
switch id := internalId.(type) {
|
||||
case string:
|
||||
internalIdAsString = id
|
||||
default:
|
||||
internalIdAsString = fmt.Sprintf("%#v", internalId)
|
||||
}
|
||||
|
||||
return fmt.Sprintf("database/%d/%s/%s", orgId, entityType, internalIdAsString)
|
||||
}
|
||||
|
||||
type EntityEvent struct {
|
||||
Id int64
|
||||
EventType EntityEventType
|
||||
EntityId string
|
||||
Created int64
|
||||
}
|
||||
|
||||
type SaveEventCmd struct {
|
||||
EntityId string
|
||||
EventType EntityEventType
|
||||
}
|
||||
|
||||
// EntityEventsService is a temporary solution to support change notifications in an HA setup
|
||||
// With this service each system can query for any events that have happened since a fixed time
|
||||
//go:generate mockery --name EntityEventsService --structname MockEntityEventsService --inpackage --filename entity_events_mock.go
|
||||
type EntityEventsService interface {
|
||||
registry.BackgroundService
|
||||
registry.CanBeDisabled
|
||||
SaveEvent(ctx context.Context, cmd SaveEventCmd) error
|
||||
GetLastEvent(ctx context.Context) (*EntityEvent, error)
|
||||
GetAllEventsAfter(ctx context.Context, id int64) ([]*EntityEvent, error)
|
||||
|
||||
deleteEventsOlderThan(ctx context.Context, duration time.Duration) error
|
||||
}
|
||||
|
||||
func ProvideEntityEventsService(cfg *setting.Cfg, sqlStore *sqlstore.SQLStore, features featuremgmt.FeatureToggles) EntityEventsService {
|
||||
if !features.IsEnabled(featuremgmt.FlagPanelTitleSearch) {
|
||||
return &dummyEntityEventsService{}
|
||||
}
|
||||
|
||||
return &entityEventService{
|
||||
sql: sqlStore,
|
||||
features: features,
|
||||
log: log.New("entity-events"),
|
||||
}
|
||||
}
|
||||
|
||||
type entityEventService struct {
|
||||
sql *sqlstore.SQLStore
|
||||
log log.Logger
|
||||
features featuremgmt.FeatureToggles
|
||||
}
|
||||
|
||||
func (e *entityEventService) SaveEvent(ctx context.Context, cmd SaveEventCmd) error {
|
||||
return e.sql.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
_, err := sess.Insert(&EntityEvent{
|
||||
EventType: cmd.EventType,
|
||||
EntityId: cmd.EntityId,
|
||||
Created: time.Now().Unix(),
|
||||
})
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func (e *entityEventService) GetLastEvent(ctx context.Context) (*EntityEvent, error) {
|
||||
var entityEvent *EntityEvent
|
||||
err := e.sql.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
bean := &EntityEvent{}
|
||||
found, err := sess.OrderBy("id desc").Get(bean)
|
||||
if found {
|
||||
entityEvent = bean
|
||||
}
|
||||
return err
|
||||
})
|
||||
|
||||
return entityEvent, err
|
||||
}
|
||||
|
||||
func (e *entityEventService) GetAllEventsAfter(ctx context.Context, id int64) ([]*EntityEvent, error) {
|
||||
var evs = make([]*EntityEvent, 0)
|
||||
err := e.sql.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
return sess.OrderBy("id asc").Where("id > ?", id).Find(&evs)
|
||||
})
|
||||
|
||||
return evs, err
|
||||
}
|
||||
|
||||
func (e *entityEventService) deleteEventsOlderThan(ctx context.Context, duration time.Duration) error {
|
||||
return e.sql.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
maxCreated := time.Now().Add(-duration)
|
||||
deletedCount, err := sess.Where("created < ?", maxCreated.Unix()).Delete(&EntityEvent{})
|
||||
e.log.Info("deleting old events", "count", deletedCount, "maxCreated", maxCreated)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func (e *entityEventService) IsDisabled() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (e *entityEventService) Run(ctx context.Context) error {
|
||||
clean := time.NewTicker(1 * time.Hour)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-clean.C:
|
||||
go func() {
|
||||
err := e.deleteEventsOlderThan(context.Background(), 24*time.Hour)
|
||||
if err != nil {
|
||||
e.log.Info("failed to delete old entity events", "error", err)
|
||||
}
|
||||
}()
|
||||
case <-ctx.Done():
|
||||
e.log.Debug("Grafana is shutting down - stopping entity events service")
|
||||
clean.Stop()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type dummyEntityEventsService struct {
|
||||
}
|
||||
|
||||
func (d dummyEntityEventsService) Run(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d dummyEntityEventsService) IsDisabled() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (d dummyEntityEventsService) SaveEvent(ctx context.Context, cmd SaveEventCmd) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d dummyEntityEventsService) GetLastEvent(ctx context.Context) (*EntityEvent, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (d dummyEntityEventsService) GetAllEventsAfter(ctx context.Context, id int64) ([]*EntityEvent, error) {
|
||||
return make([]*EntityEvent, 0), nil
|
||||
}
|
||||
|
||||
func (d dummyEntityEventsService) deleteEventsOlderThan(ctx context.Context, duration time.Duration) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ EntityEventsService = &dummyEntityEventsService{}
|
117
pkg/services/store/entity_events_mock.go
Normal file
117
pkg/services/store/entity_events_mock.go
Normal file
@ -0,0 +1,117 @@
|
||||
// Code generated by mockery v2.10.6. DO NOT EDIT.
|
||||
|
||||
package store
|
||||
|
||||
import (
|
||||
context "context"
|
||||
time "time"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
// MockEntityEventsService is an autogenerated mock type for the EntityEventsService type
|
||||
type MockEntityEventsService struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// GetAllEventsAfter provides a mock function with given fields: ctx, id
|
||||
func (_m *MockEntityEventsService) GetAllEventsAfter(ctx context.Context, id int64) ([]*EntityEvent, error) {
|
||||
ret := _m.Called(ctx, id)
|
||||
|
||||
var r0 []*EntityEvent
|
||||
if rf, ok := ret.Get(0).(func(context.Context, int64) []*EntityEvent); ok {
|
||||
r0 = rf(ctx, id)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).([]*EntityEvent)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok {
|
||||
r1 = rf(ctx, id)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// GetLastEvent provides a mock function with given fields: ctx
|
||||
func (_m *MockEntityEventsService) GetLastEvent(ctx context.Context) (*EntityEvent, error) {
|
||||
ret := _m.Called(ctx)
|
||||
|
||||
var r0 *EntityEvent
|
||||
if rf, ok := ret.Get(0).(func(context.Context) *EntityEvent); ok {
|
||||
r0 = rf(ctx)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*EntityEvent)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
|
||||
r1 = rf(ctx)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// IsDisabled provides a mock function with given fields:
|
||||
func (_m *MockEntityEventsService) IsDisabled() bool {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 bool
|
||||
if rf, ok := ret.Get(0).(func() bool); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
r0 = ret.Get(0).(bool)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// Run provides a mock function with given fields: ctx
|
||||
func (_m *MockEntityEventsService) Run(ctx context.Context) error {
|
||||
ret := _m.Called(ctx)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
|
||||
r0 = rf(ctx)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// SaveEvent provides a mock function with given fields: ctx, cmd
|
||||
func (_m *MockEntityEventsService) SaveEvent(ctx context.Context, cmd SaveEventCmd) error {
|
||||
ret := _m.Called(ctx, cmd)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, SaveEventCmd) error); ok {
|
||||
r0 = rf(ctx, cmd)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// deleteEventsOlderThan provides a mock function with given fields: ctx, duration
|
||||
func (_m *MockEntityEventsService) deleteEventsOlderThan(ctx context.Context, duration time.Duration) error {
|
||||
ret := _m.Called(ctx, duration)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, time.Duration) error); ok {
|
||||
r0 = rf(ctx, duration)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
182
pkg/services/store/entity_events_test.go
Normal file
182
pkg/services/store/entity_events_test.go
Normal file
@ -0,0 +1,182 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestEntityEventsService(t *testing.T) {
|
||||
var ctx context.Context
|
||||
var service EntityEventsService
|
||||
|
||||
setup := func() {
|
||||
service = &entityEventService{
|
||||
sql: sqlstore.InitTestDB(t),
|
||||
log: log.New("entity-event-test"),
|
||||
}
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
t.Run("Should insert an entity event", func(t *testing.T) {
|
||||
setup()
|
||||
|
||||
err := service.SaveEvent(ctx, SaveEventCmd{
|
||||
EntityId: "database/dash/1",
|
||||
EventType: EntityEventTypeCreate,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("Should retrieve nil entity if database is empty", func(t *testing.T) {
|
||||
setup()
|
||||
|
||||
ev, err := service.GetLastEvent(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, ev)
|
||||
})
|
||||
|
||||
t.Run("Should retrieve last entity event", func(t *testing.T) {
|
||||
setup()
|
||||
lastEventEntityId := "database/dash/1"
|
||||
|
||||
err := service.SaveEvent(ctx, SaveEventCmd{
|
||||
EntityId: "database/dash/3",
|
||||
EventType: EntityEventTypeCreate,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
err = service.SaveEvent(ctx, SaveEventCmd{
|
||||
EntityId: "database/dash/2",
|
||||
EventType: EntityEventTypeCreate,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
err = service.SaveEvent(ctx, SaveEventCmd{
|
||||
EntityId: lastEventEntityId,
|
||||
EventType: EntityEventTypeCreate,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
lastEv, err := service.GetLastEvent(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, lastEventEntityId, lastEv.EntityId)
|
||||
})
|
||||
|
||||
t.Run("Should retrieve sorted events after an id", func(t *testing.T) {
|
||||
setup()
|
||||
lastEventEntityId := "database/dash/1"
|
||||
|
||||
err := service.SaveEvent(ctx, SaveEventCmd{
|
||||
EntityId: "database/dash/3",
|
||||
EventType: EntityEventTypeCreate,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
firstEv, err := service.GetLastEvent(ctx)
|
||||
firstEvId := firstEv.Id
|
||||
|
||||
err = service.SaveEvent(ctx, SaveEventCmd{
|
||||
EntityId: "database/dash/2",
|
||||
EventType: EntityEventTypeCreate,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
err = service.SaveEvent(ctx, SaveEventCmd{
|
||||
EntityId: lastEventEntityId,
|
||||
EventType: EntityEventTypeCreate,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
evs, err := service.GetAllEventsAfter(ctx, firstEvId)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, evs, 2)
|
||||
require.Equal(t, evs[0].EntityId, "database/dash/2")
|
||||
require.Equal(t, evs[1].EntityId, lastEventEntityId)
|
||||
})
|
||||
|
||||
t.Run("Should delete old events", func(t *testing.T) {
|
||||
setup()
|
||||
_ = service.SaveEvent(ctx, SaveEventCmd{
|
||||
EntityId: "database/dash/3",
|
||||
EventType: EntityEventTypeCreate,
|
||||
})
|
||||
_ = service.SaveEvent(ctx, SaveEventCmd{
|
||||
EntityId: "database/dash/2",
|
||||
EventType: EntityEventTypeCreate,
|
||||
})
|
||||
_ = service.SaveEvent(ctx, SaveEventCmd{
|
||||
EntityId: "database/dash/1",
|
||||
EventType: EntityEventTypeCreate,
|
||||
})
|
||||
|
||||
evs, err := service.GetAllEventsAfter(ctx, 0)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, evs, 3)
|
||||
|
||||
err = service.deleteEventsOlderThan(ctx, 24*time.Hour)
|
||||
require.NoError(t, err)
|
||||
|
||||
// did not delete any events
|
||||
evs, err = service.GetAllEventsAfter(ctx, 0)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, evs, 3)
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
err = service.deleteEventsOlderThan(ctx, 1*time.Second)
|
||||
require.NoError(t, err)
|
||||
|
||||
// deleted all events
|
||||
evs, err = service.GetAllEventsAfter(ctx, 0)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, evs, 0)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCreateDatabaseEntityId(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
entityType EntityType
|
||||
orgId int64
|
||||
internalId interface{}
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "int64 internal id",
|
||||
entityType: EntityTypeDashboard,
|
||||
orgId: 10,
|
||||
internalId: int64(45),
|
||||
expected: "database/10/dashboard/45",
|
||||
},
|
||||
{
|
||||
name: "big-ish int64 internal id",
|
||||
entityType: EntityTypeDashboard,
|
||||
orgId: 10,
|
||||
internalId: int64(12412421),
|
||||
expected: "database/10/dashboard/12412421",
|
||||
},
|
||||
{
|
||||
name: "int internal id",
|
||||
entityType: EntityTypeDashboard,
|
||||
orgId: 10,
|
||||
internalId: int(1244),
|
||||
expected: "database/10/dashboard/1244",
|
||||
},
|
||||
{
|
||||
name: "string internal id",
|
||||
entityType: EntityTypeDashboard,
|
||||
orgId: 10,
|
||||
internalId: "string-internal-id",
|
||||
expected: "database/10/dashboard/string-internal-id",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
require.Equal(t, tt.expected, CreateDatabaseEntityId(tt.internalId, tt.orgId, tt.entityType))
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user