mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Playlist: allow using object store as the backend (#57467)
This commit is contained in:
parent
6abd4cd8e8
commit
e6575bab76
179
pkg/services/playlist/playlistimpl/object_store.go
Normal file
179
pkg/services/playlist/playlistimpl/object_store.go
Normal file
@ -0,0 +1,179 @@
|
||||
package playlistimpl
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/services/playlist"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore/session"
|
||||
objectstore "github.com/grafana/grafana/pkg/services/store"
|
||||
"github.com/grafana/grafana/pkg/services/store/object"
|
||||
"github.com/grafana/grafana/pkg/services/user"
|
||||
)
|
||||
|
||||
// This is a playlist implementation that will:
|
||||
// 1. CREATE/UPDATE/DELETE everythign with existing direct SQL store
|
||||
// 2. CREATE/UPDATE/DELETE same items to the object store
|
||||
// 3. Use the object store for all read operations
|
||||
// This givs us a safe test bed to work with the store but still roll back without any lost work
|
||||
type objectStoreImpl struct {
|
||||
sess *session.SessionDB
|
||||
sqlimpl *Service
|
||||
objectstore object.ObjectStoreServer
|
||||
}
|
||||
|
||||
var _ playlist.Service = &objectStoreImpl{}
|
||||
|
||||
func (s *objectStoreImpl) sync() {
|
||||
rows, err := s.sess.Query(context.Background(), "SELECT org_id,uid FROM playlist ORDER BY org_id asc")
|
||||
if err != nil {
|
||||
fmt.Printf("error loading playlists")
|
||||
return
|
||||
}
|
||||
|
||||
// Change the org_id with each row
|
||||
rowUser := &user.SignedInUser{
|
||||
Login: "?",
|
||||
OrgID: 0, // gets filled in from each row
|
||||
UserID: 0,
|
||||
}
|
||||
ctx := objectstore.ContextWithUser(context.Background(), rowUser)
|
||||
uid := ""
|
||||
for rows.Next() {
|
||||
err = rows.Scan(&rowUser.OrgID, &uid)
|
||||
if err != nil {
|
||||
fmt.Printf("error loading playlists: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
dto, err := s.sqlimpl.Get(ctx, &playlist.GetPlaylistByUidQuery{
|
||||
OrgId: rowUser.OrgID,
|
||||
UID: uid,
|
||||
})
|
||||
if err != nil {
|
||||
fmt.Printf("error loading playlist: %v", err)
|
||||
return
|
||||
}
|
||||
body, _ := json.Marshal(dto)
|
||||
_, _ = s.objectstore.Write(ctx, &object.WriteObjectRequest{
|
||||
UID: uid,
|
||||
Kind: models.StandardKindPlaylist,
|
||||
Body: body,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (s *objectStoreImpl) Create(ctx context.Context, cmd *playlist.CreatePlaylistCommand) (*playlist.Playlist, error) {
|
||||
rsp, err := s.sqlimpl.store.Insert(ctx, cmd)
|
||||
if err == nil && rsp != nil {
|
||||
body, err := json.Marshal(cmd)
|
||||
if err != nil {
|
||||
return rsp, fmt.Errorf("unable to write playlist to store")
|
||||
}
|
||||
_, err = s.objectstore.Write(ctx, &object.WriteObjectRequest{
|
||||
UID: rsp.UID,
|
||||
Kind: models.StandardKindPlaylist,
|
||||
Body: body,
|
||||
})
|
||||
if err != nil {
|
||||
return rsp, fmt.Errorf("unable to write playlist to store")
|
||||
}
|
||||
}
|
||||
return rsp, err
|
||||
}
|
||||
|
||||
func (s *objectStoreImpl) Update(ctx context.Context, cmd *playlist.UpdatePlaylistCommand) (*playlist.PlaylistDTO, error) {
|
||||
rsp, err := s.sqlimpl.store.Update(ctx, cmd)
|
||||
if err == nil {
|
||||
body, err := json.Marshal(cmd)
|
||||
if err != nil {
|
||||
return rsp, fmt.Errorf("unable to write playlist to store")
|
||||
}
|
||||
_, err = s.objectstore.Write(ctx, &object.WriteObjectRequest{
|
||||
UID: rsp.Uid,
|
||||
Kind: models.StandardKindPlaylist,
|
||||
Body: body,
|
||||
})
|
||||
if err != nil {
|
||||
return rsp, fmt.Errorf("unable to write playlist to store")
|
||||
}
|
||||
}
|
||||
return rsp, err
|
||||
}
|
||||
|
||||
func (s *objectStoreImpl) Delete(ctx context.Context, cmd *playlist.DeletePlaylistCommand) error {
|
||||
err := s.sqlimpl.store.Delete(ctx, cmd)
|
||||
if err == nil {
|
||||
_, err = s.objectstore.Delete(ctx, &object.DeleteObjectRequest{
|
||||
UID: cmd.UID,
|
||||
Kind: models.StandardKindPlaylist,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to delete playlist to store")
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
//------------------------------------------------------
|
||||
// Read access is managed entirely by the object store
|
||||
//------------------------------------------------------
|
||||
|
||||
func (s *objectStoreImpl) GetWithoutItems(ctx context.Context, q *playlist.GetPlaylistByUidQuery) (*playlist.Playlist, error) {
|
||||
p, err := s.Get(ctx, q) // OrgID is actually picked from the user!
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &playlist.Playlist{
|
||||
UID: p.Uid,
|
||||
OrgId: q.OrgId,
|
||||
Name: p.Name,
|
||||
Interval: p.Interval,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *objectStoreImpl) Get(ctx context.Context, q *playlist.GetPlaylistByUidQuery) (*playlist.PlaylistDTO, error) {
|
||||
rsp, err := s.objectstore.Read(ctx, &object.ReadObjectRequest{
|
||||
UID: q.UID,
|
||||
Kind: models.StandardKindPlaylist,
|
||||
WithBody: true,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if rsp.Object == nil || rsp.Object.Body == nil {
|
||||
return nil, fmt.Errorf("missing object")
|
||||
}
|
||||
|
||||
// Get the object from payload
|
||||
found := &playlist.PlaylistDTO{}
|
||||
err = json.Unmarshal(rsp.Object.Body, found)
|
||||
return found, err
|
||||
}
|
||||
|
||||
func (s *objectStoreImpl) Search(ctx context.Context, q *playlist.GetPlaylistsQuery) (playlist.Playlists, error) {
|
||||
playlists := make(playlist.Playlists, 0)
|
||||
|
||||
rsp, err := s.objectstore.Search(ctx, &object.ObjectSearchRequest{
|
||||
Kind: []string{models.StandardKindPlaylist},
|
||||
WithBody: true,
|
||||
Limit: 1000,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, res := range rsp.Results {
|
||||
found := &playlist.PlaylistDTO{}
|
||||
if res.Body != nil {
|
||||
err = json.Unmarshal(res.Body, found)
|
||||
}
|
||||
playlists = append(playlists, &playlist.Playlist{
|
||||
UID: res.UID,
|
||||
Name: res.Name,
|
||||
Interval: found.Interval,
|
||||
})
|
||||
}
|
||||
return playlists, err
|
||||
}
|
@ -4,27 +4,44 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/db"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/playlist"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/services/store/object"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
store store
|
||||
}
|
||||
|
||||
func ProvideService(db db.DB, cfg *setting.Cfg) playlist.Service {
|
||||
if cfg.IsFeatureToggleEnabled("newDBLibrary") {
|
||||
return &Service{
|
||||
store: &sqlxStore{
|
||||
sess: db.GetSqlxSession(),
|
||||
},
|
||||
var _ playlist.Service = &Service{}
|
||||
|
||||
func ProvideService(db db.DB, toggles featuremgmt.FeatureToggles, objserver object.ObjectStoreServer) playlist.Service {
|
||||
var sqlstore store
|
||||
|
||||
// 🐢🐢🐢 pick the store
|
||||
if toggles.IsEnabled("newDBLibrary") { // hymmm not a registered feature flag
|
||||
sqlstore = &sqlxStore{
|
||||
sess: db.GetSqlxSession(),
|
||||
}
|
||||
} else {
|
||||
sqlstore = &sqlStore{
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
return &Service{
|
||||
store: &sqlStore{
|
||||
db: db,
|
||||
},
|
||||
svc := &Service{store: sqlstore}
|
||||
|
||||
// FlagObjectStore is only supported in development mode
|
||||
if toggles.IsEnabled(featuremgmt.FlagObjectStore) {
|
||||
impl := &objectStoreImpl{
|
||||
sqlimpl: svc,
|
||||
objectstore: objserver,
|
||||
sess: db.GetSqlxSession(),
|
||||
}
|
||||
impl.sync() // load everythign from the existing SQL setup into the new object store
|
||||
return impl
|
||||
}
|
||||
|
||||
return svc
|
||||
}
|
||||
|
||||
func (s *Service) Create(ctx context.Context, cmd *playlist.CreatePlaylistCommand) (*playlist.Playlist, error) {
|
||||
|
@ -351,14 +351,24 @@ func (i *dummyObjectServer) Search(ctx context.Context, r *object.ObjectSearchRe
|
||||
|
||||
searchResults := make([]*object.ObjectSearchResult, 0)
|
||||
for _, o := range objects {
|
||||
builder := i.kinds.GetSummaryBuilder(o.Object.Kind)
|
||||
if builder == nil {
|
||||
continue
|
||||
}
|
||||
summary, clean, e2 := builder(ctx, o.Object.UID, o.Object.Body)
|
||||
if e2 != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
searchResults = append(searchResults, &object.ObjectSearchResult{
|
||||
UID: o.Object.UID,
|
||||
Kind: o.Object.Kind,
|
||||
Version: o.Object.Version,
|
||||
Updated: o.Object.Updated,
|
||||
UpdatedBy: o.Object.UpdatedBy,
|
||||
Name: "? name from summary",
|
||||
Body: o.Object.Body,
|
||||
UID: o.Object.UID,
|
||||
Kind: o.Object.Kind,
|
||||
Version: o.Object.Version,
|
||||
Updated: o.Object.Updated,
|
||||
UpdatedBy: o.Object.UpdatedBy,
|
||||
Name: summary.Name,
|
||||
Description: summary.Description,
|
||||
Body: clean,
|
||||
})
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user