with dashboard service

This commit is contained in:
Ryan McKinley 2024-06-27 00:10:30 +03:00
parent 86a7064334
commit 7345ece8ef
13 changed files with 467 additions and 528 deletions

View File

@ -15,6 +15,7 @@ const (
NamespaceAnonymous Namespace = "anonymous"
NamespaceRenderService Namespace = "render"
NamespaceAccessPolicy Namespace = "access-policy"
NamespaceProvisioning Namespace = "provisioning"
NamespaceEmpty Namespace = ""
)

View File

@ -5,23 +5,23 @@ import (
"database/sql"
"fmt"
"path/filepath"
"strings"
"sync"
"time"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apimachinery/utils"
dashboardsV0 "github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/appcontext"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
gapiutil "github.com/grafana/grafana/pkg/services/apiserver/utils"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/provisioning"
"github.com/grafana/grafana/pkg/services/sqlstore/session"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
var (
@ -29,6 +29,9 @@ var (
)
type dashboardRow struct {
// The numeric resource version for this dashboard
ResourceVersion int64
// Dashboard resource
Dash *dashboardsV0.Dashboard
@ -38,9 +41,6 @@ type dashboardRow struct {
// The folder UID (needed for access control checks)
FolderUID string
// Needed for fast summary access
Tags []string
// Size (in bytes) of the dashboard payload
Bytes int
@ -55,6 +55,10 @@ type dashboardSqlAccess struct {
namespacer request.NamespaceMapper
dashStore dashboards.Store
provisioning provisioning.ProvisioningService
// Typically one... the server wrapper
subscribers []chan *resource.WrittenEvent
mutex sync.Mutex
}
func NewDashboardAccess(sql db.DB, namespacer request.NamespaceMapper, dashStore dashboards.Store, provisioning provisioning.ProvisioningService) DashboardAccess {
@ -71,8 +75,8 @@ const selector = `SELECT
dashboard.org_id, dashboard.id,
dashboard.uid,slug,
dashboard.folder_uid,
dashboard.created,dashboard.created_by,CreatedUSER.login,
dashboard.updated,dashboard.updated_by,UpdatedUSER.login,
dashboard.created,dashboard.created_by,CreatedUSER.uid as created_uid,
dashboard.updated,dashboard.updated_by,UpdatedUSER.uid as updated_uid,
plugin_id,
dashboard_provisioning.name as origin_name,
dashboard_provisioning.external_id as origin_path,
@ -87,52 +91,31 @@ const selector = `SELECT
LEFT OUTER JOIN user AS UpdatedUSER ON dashboard.created_by = UpdatedUSER.id
WHERE is_folder = false`
func (a *dashboardSqlAccess) getRows(ctx context.Context, query *DashboardQuery, onlySummary bool) (*rowsWrapper, int, error) {
if !query.Labels.Empty() {
return nil, 0, fmt.Errorf("label selection not yet supported")
}
if len(query.Requirements.SortBy) > 0 {
return nil, 0, fmt.Errorf("sorting not yet supported")
}
if query.Requirements.ListHistory != "" {
return nil, 0, fmt.Errorf("ListHistory not yet supported")
}
if query.Requirements.ListDeleted {
return nil, 0, fmt.Errorf("ListDeleted not yet supported")
func (a *dashboardSqlAccess) getRows(ctx context.Context, query *DashboardQuery) (*rowsWrapper, int, error) {
if len(query.Labels) > 0 {
return nil, 0, fmt.Errorf("labels not yet supported")
// if query.Requirements.Folder != nil {
// args = append(args, *query.Requirements.Folder)
// sqlcmd = fmt.Sprintf("%s AND dashboard.folder_uid=$%d", sqlcmd, len(args))
// }
}
token, err := readContinueToken(query)
if err != nil {
return nil, 0, err
}
args := []any{query.OrgID}
sqlcmd := fmt.Sprintf("%s AND dashboard.org_id=$%d", selector, len(args))
limit := query.Limit
if limit < 1 {
limit = 15 //
}
args := []any{query.OrgID}
sqlcmd := selector
// We can not do this yet because title + tags are in the body
if onlySummary && false {
sqlcmd = strings.Replace(sqlcmd, "dashboard.data", `"{}"`, 1)
}
sqlcmd = fmt.Sprintf("%s AND dashboard.org_id=$%d", sqlcmd, len(args))
if query.UID != "" {
args = append(args, query.UID)
sqlcmd = fmt.Sprintf("%s AND dashboard.uid=$%d", sqlcmd, len(args))
} else {
args = append(args, token.id)
} else if query.MinID > 0 {
args = append(args, query.MinID)
sqlcmd = fmt.Sprintf("%s AND dashboard.id>=$%d", sqlcmd, len(args))
}
if query.Requirements.Folder != nil {
args = append(args, *query.Requirements.Folder)
sqlcmd = fmt.Sprintf("%s AND dashboard.folder_uid=$%d", sqlcmd, len(args))
}
args = append(args, (limit + 2)) // add more so we can include a next token
sqlcmd = fmt.Sprintf("%s ORDER BY dashboard.id asc LIMIT $%d", sqlcmd, len(args))
@ -146,51 +129,8 @@ func (a *dashboardSqlAccess) getRows(ctx context.Context, query *DashboardQuery,
return rows, limit, err
}
// GetDashboards implements DashboardAccess.
func (a *dashboardSqlAccess) GetDashboards(ctx context.Context, query *DashboardQuery) (*dashboardsV0.DashboardList, error) {
rows, limit, err := a.getRows(ctx, query, false)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()
totalSize := 0
list := &dashboardsV0.DashboardList{}
for {
row, err := rows.Next()
if err != nil || row == nil {
return list, err
}
totalSize += row.Bytes
if len(list.Items) > 0 && (totalSize > query.MaxBytes || len(list.Items) >= limit) {
if query.Requirements.Folder != nil {
row.token.folder = *query.Requirements.Folder
}
list.Continue = row.token.String() // will skip this one but start here next time
return list, err
}
list.Items = append(list.Items, *row.Dash)
}
}
func (a *dashboardSqlAccess) GetDashboard(ctx context.Context, orgId int64, uid string) (*dashboardsV0.Dashboard, error) {
r, err := a.GetDashboards(ctx, &DashboardQuery{
OrgID: orgId,
UID: uid,
Labels: labels.Everything(),
})
if err != nil {
return nil, err
}
if len(r.Items) > 0 {
return &r.Items[0], nil
}
return nil, fmt.Errorf("not found")
}
func (a *dashboardSqlAccess) doQuery(ctx context.Context, query string, args ...any) (*rowsWrapper, error) {
user, err := appcontext.User(ctx)
_, err := identity.GetRequester(ctx)
if err != nil {
return nil, err
}
@ -199,7 +139,10 @@ func (a *dashboardSqlAccess) doQuery(ctx context.Context, query string, args ...
rows: rows,
a: a,
// This looks up rules from the permissions on a user
canReadDashboard: accesscontrol.Checker(user, dashboards.ActionDashboardsRead),
canReadDashboard: func(scopes ...string) bool {
return true // ???
},
// accesscontrol.Checker(user, dashboards.ActionDashboardsRead),
}, err
}
@ -230,7 +173,7 @@ func (r *rowsWrapper) Next() (*dashboardRow, error) {
if !r.canReadDashboard(scopes...) {
continue
}
d.token.size = r.total // size before next!
d.token.bytes = r.total // size before next!
r.total += int64(d.Bytes)
}
@ -243,7 +186,7 @@ func (r *rowsWrapper) Next() (*dashboardRow, error) {
func (a *dashboardSqlAccess) scanRow(rows *sql.Rows) (*dashboardRow, error) {
dash := &dashboardsV0.Dashboard{
TypeMeta: dashboardsV0.DashboardResourceInfo.TypeMeta(),
ObjectMeta: v1.ObjectMeta{Annotations: make(map[string]string)},
ObjectMeta: metav1.ObjectMeta{Annotations: make(map[string]string)},
}
row := &dashboardRow{Dash: dash}
@ -253,11 +196,11 @@ func (a *dashboardSqlAccess) scanRow(rows *sql.Rows) (*dashboardRow, error) {
var folder_uid sql.NullString
var updated time.Time
var updatedByID int64
var updatedByName sql.NullString
var updatedByUID sql.NullString
var created time.Time
var createdByID int64
var createdByName sql.NullString
var createdByUID sql.NullString
var plugin_id string
var origin_name sql.NullString
@ -269,8 +212,8 @@ func (a *dashboardSqlAccess) scanRow(rows *sql.Rows) (*dashboardRow, error) {
err := rows.Scan(&orgId, &dashboard_id, &dash.Name,
&slug, &folder_uid,
&created, &createdByID, &createdByName,
&updated, &updatedByID, &updatedByName,
&created, &createdByID, &createdByUID,
&updated, &updatedByID, &updatedByUID,
&plugin_id,
&origin_name, &origin_path, &origin_hash, &origin_ts,
&version,
@ -279,10 +222,11 @@ func (a *dashboardSqlAccess) scanRow(rows *sql.Rows) (*dashboardRow, error) {
row.token = &continueToken{orgId: orgId, id: dashboard_id}
if err == nil {
dash.ResourceVersion = fmt.Sprintf("%d", created.UnixMilli())
row.ResourceVersion = updated.UnixNano() + version
dash.ResourceVersion = fmt.Sprintf("%d", row.ResourceVersion)
dash.Namespace = a.namespacer(orgId)
dash.UID = gapiutil.CalculateClusterWideUID(dash)
dash.SetCreationTimestamp(v1.NewTime(created))
dash.SetCreationTimestamp(metav1.NewTime(created))
meta, err := utils.MetaAccessor(dash)
if err != nil {
return nil, err
@ -290,10 +234,14 @@ func (a *dashboardSqlAccess) scanRow(rows *sql.Rows) (*dashboardRow, error) {
meta.SetUpdatedTimestamp(&updated)
meta.SetSlug(slug)
if createdByID > 0 {
meta.SetCreatedBy(fmt.Sprintf("user:%d/%s", createdByID, createdByName.String))
meta.SetCreatedBy(identity.NewNamespaceID(identity.NamespaceUser, createdByID).String())
} else if createdByID < 0 {
meta.SetCreatedBy(identity.NewNamespaceID(identity.NamespaceProvisioning, 0).String())
}
if updatedByID > 0 {
meta.SetUpdatedBy(fmt.Sprintf("user:%d/%s", updatedByID, updatedByName.String))
meta.SetCreatedBy(identity.NewNamespaceID(identity.NamespaceUser, updatedByID).String())
} else if updatedByID < 0 {
meta.SetCreatedBy(identity.NewNamespaceID(identity.NamespaceProvisioning, 0).String())
}
if folder_uid.Valid {
meta.SetFolder(folder_uid.String)
@ -333,7 +281,6 @@ func (a *dashboardSqlAccess) scanRow(rows *sql.Rows) (*dashboardRow, error) {
}
dash.Spec.Set("id", dashboard_id) // add it so we can get it from the body later
row.Title = dash.Spec.GetNestedString("title")
row.Tags = dash.Spec.GetNestedStringSlice("tags")
}
}
return row, err
@ -341,7 +288,7 @@ func (a *dashboardSqlAccess) scanRow(rows *sql.Rows) (*dashboardRow, error) {
// DeleteDashboard implements DashboardAccess.
func (a *dashboardSqlAccess) DeleteDashboard(ctx context.Context, orgId int64, uid string) (*dashboardsV0.Dashboard, bool, error) {
dash, err := a.GetDashboard(ctx, orgId, uid)
dash, _, err := a.GetDashboard(ctx, orgId, uid)
if err != nil {
return nil, false, err
}
@ -404,6 +351,6 @@ func (a *dashboardSqlAccess) SaveDashboard(ctx context.Context, orgId int64, das
if out != nil {
created = (out.Created.Unix() == out.Updated.Unix()) // and now?
}
dash, err = a.GetDashboard(ctx, orgId, out.UID)
dash, _, err = a.GetDashboard(ctx, orgId, out.UID)
return dash, created, err
}

View File

@ -0,0 +1,264 @@
package access
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/grafana/grafana/pkg/apimachinery/utils"
dashboard "github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1"
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
func getDashbaordFromEvent(event resource.WriteEvent) (*dashboard.Dashboard, error) {
obj, ok := event.Object.GetRuntimeObject()
if ok && obj != nil {
dash, ok := obj.(*dashboard.Dashboard)
if ok {
return dash, nil
}
}
dash := &dashboard.Dashboard{}
err := json.Unmarshal(event.Value, dash)
return dash, err
}
func isDashboardKey(key *resource.ResourceKey, requireName bool) error {
gr := dashboard.DashboardResourceInfo.GroupResource()
if key.Group != gr.Group {
return fmt.Errorf("expecting dashboard group")
}
if key.Resource != gr.Resource {
return fmt.Errorf("expecting dashboard resource")
}
if requireName && key.Name == "" {
return fmt.Errorf("expecting dashboard name (uid)")
}
return nil
}
func (a *dashboardSqlAccess) WriteEvent(ctx context.Context, event resource.WriteEvent) (rv int64, err error) {
info, err := request.ParseNamespace(event.Key.Namespace)
if err == nil {
err = isDashboardKey(event.Key, true)
}
if err != nil {
return 0, err
}
switch event.Type {
case resource.WatchEvent_DELETED:
{
_, _, err = a.DeleteDashboard(ctx, info.OrgID, event.Key.Name)
rv = event.EventID
}
// The difference depends on embedded internal ID
case resource.WatchEvent_ADDED, resource.WatchEvent_MODIFIED:
{
dash, err := getDashbaordFromEvent(event)
if err != nil {
return 0, err
}
after, _, err := a.SaveDashboard(ctx, info.OrgID, dash)
if err != nil {
return 0, err
}
if after != nil {
meta, err := utils.MetaAccessor(after)
if err != nil {
return 0, err
}
rv, err = meta.GetResourceVersionInt64()
if err != nil {
return 0, err
}
}
}
default:
return 0, fmt.Errorf("unsupported event type: %v", event.Type)
}
// Async notify all subscribers (not HA!!!)
if a.subscribers != nil {
go func() {
write := &resource.WrittenEvent{
WriteEvent: event,
Timestamp: time.Now().UnixMilli(),
ResourceVersion: rv,
}
for _, sub := range a.subscribers {
sub <- write
}
}()
}
return rv, err
}
// Read implements ResourceStoreServer.
func (a *dashboardSqlAccess) GetDashboard(ctx context.Context, orgId int64, uid string) (*dashboard.Dashboard, int64, error) {
rows, _, err := a.getRows(ctx, &DashboardQuery{
OrgID: orgId,
UID: uid,
Limit: 100, // will only be one!
})
if err != nil {
return nil, 0, err
}
defer func() { _ = rows.Close() }()
row, err := rows.Next()
if err != nil {
return nil, 0, err
}
return row.Dash, row.ResourceVersion, nil
}
// Read implements ResourceStoreServer.
func (a *dashboardSqlAccess) Read(ctx context.Context, req *resource.ReadRequest) (*resource.ReadResponse, error) {
info, err := request.ParseNamespace(req.Key.Namespace)
if err == nil {
err = isDashboardKey(req.Key, true)
}
if err != nil {
return nil, err
}
if req.ResourceVersion > 0 {
return nil, fmt.Errorf("reading from history not yet supported")
}
dash, rv, err := a.GetDashboard(ctx, info.OrgID, req.Key.Name)
if err != nil {
return nil, err
}
value, err := json.Marshal(dash)
return &resource.ReadResponse{
ResourceVersion: rv,
Value: value,
}, err
}
// List implements AppendingStore.
func (a *dashboardSqlAccess) List(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) {
opts := req.Options
info, err := request.ParseNamespace(opts.Key.Namespace)
if err == nil {
err = isDashboardKey(opts.Key, false)
}
if err != nil {
return nil, err
}
token, err := readContinueToken(req.NextPageToken)
if err != nil {
return nil, err
}
if token.orgId > 0 && token.orgId != info.OrgID {
return nil, fmt.Errorf("token and orgID mismatch")
}
query := &DashboardQuery{
OrgID: info.OrgID,
Limit: int(req.Limit),
MaxBytes: 2 * 1024 * 1024, // 2MB,
MinID: token.id,
Labels: req.Options.Labels,
}
rows, limit, err := a.getRows(ctx, query)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()
totalSize := 0
list := &resource.ListResponse{}
for {
row, err := rows.Next()
if err != nil || row == nil {
return list, err
}
totalSize += row.Bytes
if len(list.Items) > 0 && (totalSize > query.MaxBytes || len(list.Items) >= limit) {
// if query.Requirements.Folder != nil {
// row.token.folder = *query.Requirements.Folder
// }
list.NextPageToken = row.token.String() // will skip this one but start here next time
return list, err
}
// TODO -- make it smaller and stick the body as an annotation...
val, err := json.Marshal(row.Dash)
if err != nil {
return list, err
}
list.Items = append(list.Items, &resource.ResourceWrapper{
ResourceVersion: row.ResourceVersion,
Value: val,
})
}
}
// Watch implements AppendingStore.
func (a *dashboardSqlAccess) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) {
stream := make(chan *resource.WrittenEvent, 10)
{
a.mutex.Lock()
defer a.mutex.Unlock()
// Add the event stream
a.subscribers = append(a.subscribers, stream)
}
// Wait for context done
go func() {
// Wait till the context is done
<-ctx.Done()
// Then remove the subscription
a.mutex.Lock()
defer a.mutex.Unlock()
// Copy all streams without our listener
subs := []chan *resource.WrittenEvent{}
for _, sub := range a.subscribers {
if sub != stream {
subs = append(subs, sub)
}
}
a.subscribers = subs
}()
return stream, nil
}
func (a *dashboardSqlAccess) SupportsSignedURLs() bool {
return false
}
func (a *dashboardSqlAccess) PutBlob(context.Context, *resource.PutBlobRequest) (*resource.PutBlobResponse, error) {
return nil, fmt.Errorf("not implemented yet")
}
func (a *dashboardSqlAccess) GetBlob(ctx context.Context, key *resource.ResourceKey, info *utils.BlobInfo, mustProxy bool) (*resource.GetBlobResponse, error) {
ns, err := request.ParseNamespace(key.Namespace)
if err == nil {
err = isDashboardKey(key, true)
}
if err != nil {
return nil, err
}
dash, _, err := a.GetDashboard(ctx, ns.OrgID, key.Name)
if err != nil {
return nil, err
}
rsp := &resource.GetBlobResponse{
ContentType: "application/json",
}
rsp.Value, err = json.Marshal(dash.Spec)
return rsp, err
}

View File

@ -12,16 +12,16 @@ type continueToken struct {
orgId int64
id int64 // the internal id (sort by!)
folder string // from the query
size int64
bytes int64 // information, not a query
}
func readContinueToken(q *DashboardQuery) (continueToken, error) {
func readContinueToken(next string) (continueToken, error) {
var err error
token := continueToken{}
if q.ContinueToken == "" {
if next == "" {
return token, nil
}
parts := strings.Split(q.ContinueToken, "/")
parts := strings.Split(next, "/")
if len(parts) < 3 {
return token, fmt.Errorf("invalid continue token (too few parts)")
}
@ -49,19 +49,19 @@ func readContinueToken(q *DashboardQuery) (continueToken, error) {
}
token.folder = sub[1]
// Check if the folder filter is the same from the previous query
if q.Requirements.Folder == nil {
if token.folder != "" {
return token, fmt.Errorf("invalid token, the folder must match previous query")
}
} else if token.folder != *q.Requirements.Folder {
return token, fmt.Errorf("invalid token, the folder must match previous query")
}
// // Check if the folder filter is the same from the previous query
// if q.Requirements.Folder == nil {
// if token.folder != "" {
// return token, fmt.Errorf("invalid token, the folder must match previous query")
// }
// } else if token.folder != *q.Requirements.Folder {
// return token, fmt.Errorf("invalid token, the folder must match previous query")
// }
return token, err
}
func (r *continueToken) String() string {
return fmt.Sprintf("org:%d/start:%d/folder:%s/%s",
r.orgId, r.id, r.folder, util.ByteCountSI(r.size))
r.orgId, r.id, r.folder, util.ByteCountSI(r.bytes))
}

View File

@ -3,10 +3,8 @@ package access
import (
"context"
"k8s.io/apimachinery/pkg/labels"
dashboardsV0 "github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1"
"github.com/grafana/grafana/pkg/services/apiserver/storage/entity"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
// This does not check if you have permissions!
@ -16,19 +14,17 @@ type DashboardQuery struct {
UID string // to select a single dashboard
Limit int
MaxBytes int
MinID int64 // from continue token
// FolderUID etc
Requirements entity.Requirements
// Post processing label filter
Labels labels.Selector
// The token from previous query
ContinueToken string
// The label requirements
Labels []*resource.Requirement
}
type DashboardAccess interface {
GetDashboard(ctx context.Context, orgId int64, uid string) (*dashboardsV0.Dashboard, error)
GetDashboards(ctx context.Context, query *DashboardQuery) (*dashboardsV0.DashboardList, error)
resource.AppendingStore
resource.BlobStore
GetDashboard(ctx context.Context, orgId int64, uid string) (*dashboardsV0.Dashboard, int64, error)
SaveDashboard(ctx context.Context, orgId int64, dash *dashboardsV0.Dashboard) (*dashboardsV0.Dashboard, bool, error)
DeleteDashboard(ctx context.Context, orgId int64, uid string) (*dashboardsV0.Dashboard, bool, error)

View File

@ -1,315 +1,66 @@
package dashboard
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
common "github.com/grafana/grafana/pkg/apimachinery/apis/common/v0alpha1"
"github.com/grafana/grafana/pkg/apimachinery/utils"
dashboard "github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1"
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
"github.com/grafana/grafana/pkg/registry/apis/dashboard/access"
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
"github.com/grafana/grafana/pkg/storage/unified/apistore"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
var (
_ resource.AppendingStore = (*dashboardStorage)(nil)
_ resource.BlobStore = (*dashboardStorage)(nil)
)
type dashboardStorage struct {
resource common.ResourceInfo
access access.DashboardAccess
tableConverter rest.TableConvertor
// Typically one... the server wrapper
subscribers []chan *resource.WrittenEvent
mutex sync.Mutex
}
// func (s *dashboardStorage) Create(ctx context.Context,
// obj runtime.Object,
// createValidation rest.ValidateObjectFunc,
// options *metav1.CreateOptions,
// ) (runtime.Object, error) {
// info, err := request.NamespaceInfoFrom(ctx, true)
// if err != nil {
// return nil, err
// }
// p, ok := obj.(*v0alpha1.Dashboard)
// if !ok {
// return nil, fmt.Errorf("expected dashboard?")
// }
// // HACK to simplify unique name testing from kubectl
// t := p.Spec.GetNestedString("title")
// if strings.Contains(t, "${NOW}") {
// t = strings.ReplaceAll(t, "${NOW}", fmt.Sprintf("%d", time.Now().Unix()))
// p.Spec.Set("title", t)
// }
// dash, _, err := s.access.SaveDashboard(ctx, info.OrgID, p)
// return dash, err
// }
// func (s *dashboardStorage) Update(ctx context.Context,
// name string,
// objInfo rest.UpdatedObjectInfo,
// createValidation rest.ValidateObjectFunc,
// updateValidation rest.ValidateObjectUpdateFunc,
// forceAllowCreate bool,
// options *metav1.UpdateOptions,
// ) (runtime.Object, bool, error) {
// info, err := request.NamespaceInfoFrom(ctx, true)
// if err != nil {
// return nil, false, err
// }
// created := false
// old, err := s.Get(ctx, name, nil)
// if err != nil {
// return old, created, err
// }
// obj, err := objInfo.UpdatedObject(ctx, old)
// if err != nil {
// return old, created, err
// }
// p, ok := obj.(*v0alpha1.Dashboard)
// if !ok {
// return nil, created, fmt.Errorf("expected dashboard after update")
// }
// _, created, err = s.access.SaveDashboard(ctx, info.OrgID, p)
// if err == nil {
// r, err := s.Get(ctx, name, nil)
// return r, created, err
// }
// return nil, created, err
// }
// // GracefulDeleter
// func (s *dashboardStorage) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
// info, err := request.NamespaceInfoFrom(ctx, true)
// if err != nil {
// return nil, false, err
// }
// return s.access.DeleteDashboard(ctx, info.OrgID, name)
// }
// func (s *dashboardStorage) ListXX(ctx context.Context, options *internalversion.ListOptions) (runtime.Object, error) {
// orgId, err := request.OrgIDForList(ctx)
// if err != nil {
// return nil, err
// }
// // fmt.Printf("LIST: %s\n", options.Continue)
// // translate grafana.app/* label selectors into field requirements
// requirements, newSelector, err := entity.ReadLabelSelectors(options.LabelSelector)
// if err != nil {
// return nil, err
// }
// query := &access.DashboardQuery{
// OrgID: orgId,
// Limit: int(options.Limit),
// MaxBytes: 2 * 1024 * 1024, // 2MB,
// ContinueToken: options.Continue,
// Requirements: requirements,
// Labels: newSelector,
// }
// return s.access.GetDashboards(ctx, query)
// }
func (s *dashboardStorage) SupportsSignedURLs() bool {
return false
}
func (s *dashboardStorage) PutBlob(context.Context, *resource.PutBlobRequest) (*resource.PutBlobResponse, error) {
return nil, fmt.Errorf("not implemented yet")
}
func (s *dashboardStorage) GetBlob(ctx context.Context, resource *resource.ResourceKey, info *utils.BlobInfo, mustProxy bool) (*resource.GetBlobResponse, error) {
return nil, fmt.Errorf("not implemented yet")
}
func getDashbaord(event resource.WriteEvent) (*dashboard.Dashboard, error) {
obj, ok := event.Object.GetRuntimeObject()
if ok && obj != nil {
dash, ok := obj.(*dashboard.Dashboard)
if ok {
return dash, nil
}
}
dash := &dashboard.Dashboard{}
err := json.Unmarshal(event.Value, dash)
return dash, err
}
func isDashboardKey(key *resource.ResourceKey, requireName bool) error {
gr := dashboard.DashboardResourceInfo.GroupResource()
if key.Group != gr.Group {
return fmt.Errorf("expecting dashboard group")
}
if key.Resource != gr.Resource {
return fmt.Errorf("expecting dashboard resource")
}
if requireName && key.Name == "" {
return fmt.Errorf("expecting dashboard name (uid)")
}
return nil
}
func (s *dashboardStorage) WriteEvent(ctx context.Context, event resource.WriteEvent) (rv int64, err error) {
info, err := request.ParseNamespace(event.Key.Namespace)
if err == nil {
err = isDashboardKey(event.Key, true)
}
if err != nil {
return 0, err
}
switch event.Type {
case resource.WatchEvent_DELETED:
{
_, _, err = s.access.DeleteDashboard(ctx, info.OrgID, event.Key.Name)
rv = event.EventID
}
// The difference depends on embedded internal ID
case resource.WatchEvent_ADDED, resource.WatchEvent_MODIFIED:
{
dash, err := getDashbaord(event)
if err != nil {
return 0, err
}
after, _, err := s.access.SaveDashboard(ctx, info.OrgID, dash)
if err != nil {
return 0, err
}
if after != nil {
meta, err := utils.MetaAccessor(after)
if err != nil {
return 0, err
}
rv, err = meta.GetResourceVersionInt64()
}
}
default:
return 0, fmt.Errorf("unsupported event type: %v", event.Type)
}
// Async notify all subscribers (not HA!!!)
if s.subscribers != nil {
go func() {
write := &resource.WrittenEvent{
WriteEvent: event,
Timestamp: time.Now().UnixMilli(),
ResourceVersion: rv,
}
for _, sub := range s.subscribers {
sub <- write
}
}()
}
return rv, err
}
// Read implements ResourceStoreServer.
func (s *dashboardStorage) Read(ctx context.Context, req *resource.ReadRequest) (*resource.ReadResponse, error) {
info, err := request.ParseNamespace(req.Key.Namespace)
if err == nil {
err = isDashboardKey(req.Key, true)
}
if err != nil {
return nil, err
}
if req.ResourceVersion > 0 {
return nil, fmt.Errorf("reading from history not yet supported")
}
dash, err := s.access.GetDashboard(ctx, info.OrgID, req.Key.Name)
if err != nil {
return nil, err
}
meta, err := utils.MetaAccessor(dash)
if err != nil {
return nil, err
}
rv, err := meta.GetResourceVersionInt64()
if err != nil {
return nil, err
}
value, err := json.Marshal(dash)
return &resource.ReadResponse{
ResourceVersion: rv,
Value: value,
}, err
}
// List implements AppendingStore.
func (s *dashboardStorage) List(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) {
opts := req.Options
info, err := request.ParseNamespace(opts.Key.Namespace)
if err == nil {
err = isDashboardKey(opts.Key, false)
}
func (s *dashboardStorage) newStore(scheme *runtime.Scheme, defaultOptsGetter generic.RESTOptionsGetter) (rest.Storage, error) {
server, err := resource.NewResourceServer(resource.ResourceServerOptions{
Store: s.access,
Blob: s.access,
// WriteAccess: resource.WriteAccessHooks{
// Folder: func(ctx context.Context, user identity.Requester, uid string) bool {
// // ???
// },
// },
})
if err != nil {
return nil, err
}
query := &access.DashboardQuery{
OrgID: info.OrgID,
Limit: int(req.Limit),
MaxBytes: 2 * 1024 * 1024, // 2MB,
ContinueToken: req.NextPageToken,
// Requirements: requirements,
// Labels: newSelector,
resourceInfo := s.resource
defaultOpts, err := defaultOptsGetter.GetRESTOptions(resourceInfo.GroupResource())
if err != nil {
return nil, err
}
fmt.Printf("%+v\n", query)
client := resource.NewLocalResourceStoreClient(server)
optsGetter := apistore.NewRESTOptionsGetter(client,
defaultOpts.StorageConfig.Codec,
)
// return s.access.GetDashboards(ctx, query)
return nil, fmt.Errorf("todo")
}
// Watch implements AppendingStore.
func (s *dashboardStorage) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) {
stream := make(chan *resource.WrittenEvent, 10)
{
s.mutex.Lock()
defer s.mutex.Unlock()
// Add the event stream
s.subscribers = append(s.subscribers, stream)
strategy := grafanaregistry.NewStrategy(scheme)
store := &genericregistry.Store{
NewFunc: resourceInfo.NewFunc,
NewListFunc: resourceInfo.NewListFunc,
KeyRootFunc: grafanaregistry.KeyRootFunc(resourceInfo.GroupResource()),
KeyFunc: grafanaregistry.NamespaceKeyFunc(resourceInfo.GroupResource()),
PredicateFunc: grafanaregistry.Matcher,
DefaultQualifiedResource: resourceInfo.GroupResource(),
SingularQualifiedResource: resourceInfo.SingularGroupResource(),
CreateStrategy: strategy,
UpdateStrategy: strategy,
DeleteStrategy: strategy,
TableConvertor: s.tableConverter,
}
// Wait for context done
go func() {
// Wait till the context is done
<-ctx.Done()
// Then remove the subscription
s.mutex.Lock()
defer s.mutex.Unlock()
// Copy all streams without our listener
subs := []chan *resource.WrittenEvent{}
for _, sub := range s.subscribers {
if sub != stream {
subs = append(subs, sub)
}
}
s.subscribers = subs
}()
return stream, nil
options := &generic.StoreOptions{RESTOptions: optsGetter}
if err := store.CompleteWithOptions(options); err != nil {
return nil, err
}
return store, err
}

View File

@ -2,7 +2,9 @@ package dashboard
import (
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -13,17 +15,17 @@ import (
common "k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/spec3"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1"
dashboard "github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1"
"github.com/grafana/grafana/pkg/apiserver/builder"
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/registry/apis/dashboard/access"
"github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
gapiutil "github.com/grafana/grafana/pkg/services/apiserver/utils"
"github.com/grafana/grafana/pkg/services/dashboards"
dashver "github.com/grafana/grafana/pkg/services/dashboardversion"
"github.com/grafana/grafana/pkg/services/featuremgmt"
@ -39,9 +41,7 @@ type DashboardsAPIBuilder struct {
dashboardVersionService dashver.Service
accessControl accesscontrol.AccessControl
namespacer request.NamespaceMapper
access access.DashboardAccess
dashStore dashboards.Store
store *dashboardStorage
log log.Logger
}
@ -55,6 +55,7 @@ func RegisterAPIService(cfg *setting.Cfg, features featuremgmt.FeatureToggles,
dashStore dashboards.Store,
reg prometheus.Registerer,
sql db.DB,
tracing *tracing.TracingService,
) *DashboardsAPIBuilder {
if !features.IsEnabledGlobally(featuremgmt.FlagGrafanaAPIServerWithExperimentalAPIs) {
return nil // skip registration unless opting into experimental apis
@ -62,13 +63,36 @@ func RegisterAPIService(cfg *setting.Cfg, features featuremgmt.FeatureToggles,
namespacer := request.GetNamespaceMapper(cfg)
builder := &DashboardsAPIBuilder{
log: log.New("grafana-apiserver.dashboards"),
dashboardService: dashboardService,
dashboardVersionService: dashboardVersionService,
dashStore: dashStore,
accessControl: accessControl,
namespacer: namespacer,
access: access.NewDashboardAccess(sql, namespacer, dashStore, provisioning),
log: log.New("grafana-apiserver.dashboards"),
store: &dashboardStorage{
resource: dashboard.DashboardResourceInfo,
access: access.NewDashboardAccess(sql, namespacer, dashStore, provisioning),
tableConverter: gapiutil.NewTableConverter(
dashboard.DashboardResourceInfo.GroupResource(),
[]metav1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name"},
{Name: "Title", Type: "string", Format: "string", Description: "The dashboard name"},
{Name: "Created At", Type: "date"},
},
func(obj any) ([]interface{}, error) {
dash, ok := obj.(*v0alpha1.Dashboard)
if ok {
if dash != nil {
return []interface{}{
dash.Name,
dash.Spec.GetNestedString("title"),
dash.CreationTimestamp.UTC().Format(time.RFC3339),
}, nil
}
}
return nil, fmt.Errorf("expected dashboard or summary")
}),
},
}
apiregistration.RegisterAPI(builder)
return builder
@ -122,40 +146,33 @@ func (b *DashboardsAPIBuilder) GetAPIGroupInfo(
) (*genericapiserver.APIGroupInfo, error) {
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(v0alpha1.GROUP, scheme, metav1.ParameterCodec, codecs)
resourceInfo := v0alpha1.DashboardResourceInfo
store, err := newStorage(scheme)
dash := b.store.resource
legacyStore, err := b.store.newStore(scheme, optsGetter)
if err != nil {
return nil, err
}
legacyStore := &dashboardStorage{
resource: resourceInfo,
access: b.access,
tableConverter: store.TableConvertor,
}
fmt.Printf("%v\n", legacyStore)
storage := map[string]rest.Storage{}
//storage[resourceInfo.StoragePath()] = legacyStore
storage[resourceInfo.StoragePath("dto")] = &DTOConnector{
storage[dash.StoragePath()] = legacyStore
storage[dash.StoragePath("dto")] = &DTOConnector{
builder: b,
}
storage[resourceInfo.StoragePath("versions")] = &VersionsREST{
storage[dash.StoragePath("versions")] = &VersionsREST{
builder: b,
}
// Dual writes if a RESTOptionsGetter is provided
if desiredMode != grafanarest.Mode0 && optsGetter != nil {
options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: grafanaregistry.GetAttrs}
if err := store.CompleteWithOptions(options); err != nil {
return nil, err
}
storage[resourceInfo.StoragePath()] = grafanarest.NewDualWriter(
grafanarest.Mode1,
store, //legacyStore,
store,
reg)
}
// // Dual writes if a RESTOptionsGetter is provided
// if desiredMode != grafanarest.Mode0 && optsGetter != nil {
// options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: grafanaregistry.GetAttrs}
// if err := store.CompleteWithOptions(options); err != nil {
// return nil, err
// }
// storage[resourceInfo.StoragePath()] = grafanarest.NewDualWriter(
// grafanarest.Mode1,
// store, //legacyStore,
// store,
// reg)
// }
apiGroupInfo.VersionedResourcesStorageMap[v0alpha1.VERSION] = storage
return &apiGroupInfo, nil

View File

@ -1,60 +0,0 @@
package dashboard
import (
"fmt"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1"
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
gapiutil "github.com/grafana/grafana/pkg/services/apiserver/utils"
)
var _ grafanarest.Storage = (*storage)(nil)
type storage struct {
*genericregistry.Store
}
func newStorage(scheme *runtime.Scheme) (*storage, error) {
strategy := grafanaregistry.NewStrategy(scheme)
resourceInfo := v0alpha1.DashboardResourceInfo
store := &genericregistry.Store{
NewFunc: resourceInfo.NewFunc,
NewListFunc: resourceInfo.NewListFunc,
KeyRootFunc: grafanaregistry.KeyRootFunc(resourceInfo.GroupResource()),
KeyFunc: grafanaregistry.NamespaceKeyFunc(resourceInfo.GroupResource()),
PredicateFunc: grafanaregistry.Matcher,
DefaultQualifiedResource: resourceInfo.GroupResource(),
SingularQualifiedResource: resourceInfo.SingularGroupResource(),
CreateStrategy: strategy,
UpdateStrategy: strategy,
DeleteStrategy: strategy,
}
store.TableConvertor = gapiutil.NewTableConverter(
store.DefaultQualifiedResource,
[]metav1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name"},
{Name: "Title", Type: "string", Format: "string", Description: "The dashboard name"},
{Name: "Created At", Type: "date"},
},
func(obj any) ([]interface{}, error) {
dash, ok := obj.(*v0alpha1.Dashboard)
if ok {
if dash != nil {
return []interface{}{
dash.Name,
dash.Spec.GetNestedString("title"),
dash.CreationTimestamp.UTC().Format(time.RFC3339),
}, nil
}
}
return nil, fmt.Errorf("expected dashboard or summary")
})
return &storage{Store: store}, nil
}

View File

@ -2,6 +2,7 @@ package dashboard
import (
"context"
"encoding/json"
"fmt"
"net/http"
@ -9,6 +10,7 @@ import (
"k8s.io/apiserver/pkg/registry/rest"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apimachinery/utils"
dashboard "github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1"
"github.com/grafana/grafana/pkg/infra/appcontext"
"github.com/grafana/grafana/pkg/infra/slugify"
@ -16,6 +18,7 @@ import (
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/guardian"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
// The DTO returns everything the UI needs in a single request
@ -23,8 +26,10 @@ type DTOConnector struct {
builder *DashboardsAPIBuilder
}
var _ = rest.Connecter(&DTOConnector{})
var _ = rest.StorageMetadata(&DTOConnector{})
var (
_ rest.Connecter = (*DTOConnector)(nil)
_ rest.StorageMetadata = (*DTOConnector)(nil)
)
func (r *DTOConnector) New() runtime.Object {
return &dashboard.DashboardWithAccessInfo{}
@ -88,10 +93,35 @@ func (r *DTOConnector) Connect(ctx context.Context, name string, opts runtime.Ob
r.getAnnotationPermissionsByScope(ctx, user, &access.AnnotationsPermissions.Dashboard, accesscontrol.ScopeAnnotationsTypeDashboard)
r.getAnnotationPermissionsByScope(ctx, user, &access.AnnotationsPermissions.Organization, accesscontrol.ScopeAnnotationsTypeOrganization)
dash, err := r.builder.access.GetDashboard(ctx, info.OrgID, name)
key := &resource.ResourceKey{
Namespace: info.Value,
Group: dashboard.GROUP,
Resource: dashboard.DashboardResourceInfo.GroupResource().Resource,
Name: name,
}
store := r.builder.store.access
rsp, err := store.Read(ctx, &resource.ReadRequest{Key: key})
if err != nil {
return nil, err
}
dash := &dashboard.Dashboard{}
err = json.Unmarshal(rsp.Value, dash)
if err != nil {
return nil, err
}
// TODO, load the full spec from blob storage
if false {
blob, err := store.GetBlob(ctx, key, &utils.BlobInfo{UID: "dto"}, true)
if err != nil {
return nil, err
}
err = json.Unmarshal(blob.Value, &dash.Spec)
if err != nil {
return nil, err
}
}
access.Slug = slugify.Slugify(dash.Spec.GetNestedString("title"))
access.Url = dashboards.GetDashboardFolderURL(false, name, access.Slug)

View File

@ -272,8 +272,8 @@ func (s *service) start(ctx context.Context) error {
return err
}
store := resource.NewResourceStoreClientLocal(resourceServer)
serverConfig.Config.RESTOptionsGetter = apistore.NewRESTOptionsGetter(s.cfg, store,
store := resource.NewLocalResourceStoreClient(resourceServer)
serverConfig.Config.RESTOptionsGetter = apistore.NewRESTOptionsGetter(store,
o.RecommendedOptions.Etcd.StorageConfig.Codec)
case grafanaapiserveroptions.StorageTypeUnified:

View File

@ -3,7 +3,6 @@
package apistore
import (
"encoding/json"
"path"
"time"
@ -16,40 +15,31 @@ import (
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/client-go/tools/cache"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
var _ generic.RESTOptionsGetter = (*RESTOptionsGetter)(nil)
type RESTOptionsGetter struct {
cfg *setting.Cfg
store resource.ResourceStoreClient
Codec runtime.Codec
}
func NewRESTOptionsGetter(cfg *setting.Cfg, store resource.ResourceStoreClient, codec runtime.Codec) *RESTOptionsGetter {
func NewRESTOptionsGetter(store resource.ResourceStoreClient, codec runtime.Codec) *RESTOptionsGetter {
return &RESTOptionsGetter{
cfg: cfg,
store: store,
Codec: codec,
}
}
func (f *RESTOptionsGetter) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
// build connection string to uniquely identify the storage backend
connectionInfo, err := json.Marshal(f.cfg.SectionWithEnvOverrides("resource_store").KeysHash())
if err != nil {
return generic.RESTOptions{}, err
}
storageConfig := &storagebackend.ConfigForResource{
Config: storagebackend.Config{
Type: "custom",
Prefix: "",
Transport: storagebackend.TransportConfig{
ServerList: []string{
string(connectionInfo),
// ??? string(connectionInfo),
},
},
Codec: f.Codec,

View File

@ -285,6 +285,7 @@ func toListRequest(ctx context.Context, opts storage.ListOptions) (*resource.Lis
Options: &resource.ListOptions{
Key: key,
},
NextPageToken: predicate.Continue,
}
if opts.Predicate.Label != nil && !opts.Predicate.Label.Empty() {
@ -409,7 +410,9 @@ func (s *Storage) GetList(ctx context.Context, _ string, opts storage.ListOption
if rsp.RemainingItemCount > 0 {
listAccessor.SetRemainingItemCount(&rsp.RemainingItemCount)
}
listAccessor.SetResourceVersion(strconv.FormatInt(rsp.ResourceVersion, 10))
if rsp.ResourceVersion > 0 {
listAccessor.SetResourceVersion(strconv.FormatInt(rsp.ResourceVersion, 10))
}
return nil
}

View File

@ -9,7 +9,7 @@ import (
grpcUtils "github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
)
func NewResourceStoreClientLocal(server ResourceStoreServer) ResourceStoreClient {
func NewLocalResourceStoreClient(server ResourceStoreServer) ResourceStoreClient {
channel := &inprocgrpc.Channel{}
auth := &grpcUtils.Authenticator{}
@ -25,6 +25,6 @@ func NewResourceStoreClientLocal(server ResourceStoreServer) ResourceStoreClient
return NewResourceStoreClient(grpchan.InterceptClientConn(channel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor))
}
func NewEntityStoreClientGRPC(channel *grpc.ClientConn) ResourceStoreClient {
func NewResourceStoreClientGRPC(channel *grpc.ClientConn) ResourceStoreClient {
return NewResourceStoreClient(grpchan.InterceptClientConn(channel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor))
}