now with event setup

This commit is contained in:
Ryan McKinley 2024-06-12 13:02:06 +03:00
parent d83aa7f865
commit 16471fa057
9 changed files with 479 additions and 193 deletions

View File

@ -0,0 +1,219 @@
package resource
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync/atomic"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/grafana/grafana/pkg/infra/appcontext"
"github.com/grafana/grafana/pkg/services/apiserver/utils"
"github.com/grafana/grafana/pkg/services/auth/identity"
)
type WriteEvent struct {
EventID int64
Key *Key // the request key
Requester identity.Requester
Operation ResourceOperation
PreviousRV int64 // only for Update+Delete
Value []byte
Object utils.GrafanaMetaAccessor
OldObject utils.GrafanaMetaAccessor
// Change metadata
FolderChanged bool
// The status will be populated for any error
Status *StatusResult
Error error
}
func (e *WriteEvent) BadRequest(err error, message string, a ...any) *WriteEvent {
e.Error = err
e.Status = &StatusResult{
Status: "Failure",
Message: fmt.Sprintf(message, a...),
Code: http.StatusBadRequest,
}
return e
}
// Verify that all required fields are set, and the user has permission to set the common metadata fields
type EventValidator interface {
PrepareCreate(ctx context.Context, req *CreateRequest) (*WriteEvent, error)
PrepareUpdate(ctx context.Context, req *UpdateRequest, current []byte) (*WriteEvent, error)
}
type EventValidatorOptions struct {
// Get the next EventID
NextEventID func() int64
// Check if a user has access to write folders
// When this is nil, no resources can have folders configured
FolderAccess func(ctx context.Context, user identity.Requester, uid string) bool
// When configured, this will make sure a user is allowed to save to a given origin
OriginAccess func(ctx context.Context, user identity.Requester, origin string) bool
}
type eventValidator struct {
opts EventValidatorOptions
}
func NewEventValidator(opts EventValidatorOptions) EventValidator {
if opts.NextEventID == nil {
counter := atomic.Int64{}
opts.NextEventID = func() int64 {
return counter.Add(1)
}
}
return &eventValidator{opts}
}
type dummyObject struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
}
var _ EventValidator = &eventValidator{}
func (v *eventValidator) newEvent(ctx context.Context, key *Key, value, oldValue []byte) *WriteEvent {
var err error
event := &WriteEvent{
EventID: v.opts.NextEventID(),
Key: key,
Value: value,
}
event.Requester, err = appcontext.User(ctx)
if err != nil {
return event.BadRequest(err, "unable to get user")
}
dummy := &dummyObject{}
err = json.Unmarshal(value, dummy)
if err != nil {
return event.BadRequest(err, "error reading json")
}
obj, err := utils.MetaAccessor(dummy)
if err != nil {
return event.BadRequest(err, "invalid object in json")
}
if obj.GetUID() == "" {
return event.BadRequest(nil, "the UID must be set")
}
if obj.GetGenerateName() != "" {
return event.BadRequest(nil, "can not save value with generate name")
}
if obj.GetKind() == "" {
return event.BadRequest(nil, "expecting resources with a kind in the body")
}
if obj.GetName() != key.Name {
return event.BadRequest(nil, "key name does not match the name in the body")
}
if obj.GetNamespace() != key.Namespace {
return event.BadRequest(nil, "key namespace does not match the namespace in the body")
}
folder := obj.GetFolder()
if folder != "" {
if v.opts.FolderAccess == nil {
return event.BadRequest(err, "folders are not supported")
} else if !v.opts.FolderAccess(ctx, event.Requester, folder) {
return event.BadRequest(err, "unable to add resource to folder") // 403?
}
}
origin, err := obj.GetOriginInfo()
if err != nil {
return event.BadRequest(err, "invalid origin info")
}
if origin != nil && v.opts.OriginAccess != nil {
if !v.opts.OriginAccess(ctx, event.Requester, origin.Name) {
return event.BadRequest(err, "not allowed to write resource to origin (%s)", origin.Name)
}
}
event.Object = obj
// This is an update
if oldValue != nil {
dummy := &dummyObject{}
err = json.Unmarshal(oldValue, dummy)
if err != nil {
return event.BadRequest(err, "error reading old json value")
}
old, err := utils.MetaAccessor(dummy)
if err != nil {
return event.BadRequest(err, "invalid object inside old json")
}
if key.Name != old.GetName() {
return event.BadRequest(err, "the old value has a different name (%s != %s)", key.Name, old.GetName())
}
// Can not change creation timestamps+user
if obj.GetCreatedBy() != old.GetCreatedBy() {
return event.BadRequest(err, "can not change the created by metadata (%s != %s)", obj.GetCreatedBy(), old.GetCreatedBy())
}
if obj.GetCreationTimestamp() != old.GetCreationTimestamp() {
return event.BadRequest(err, "can not change the CreationTimestamp metadata (%v != %v)", obj.GetCreationTimestamp(), old.GetCreationTimestamp())
}
oldFolder := obj.GetFolder()
if oldFolder != folder {
event.FolderChanged = true
}
event.OldObject = old
} else if folder != "" {
event.FolderChanged = true
}
return event
}
func (v *eventValidator) PrepareCreate(ctx context.Context, req *CreateRequest) (*WriteEvent, error) {
event := v.newEvent(ctx, req.Key, req.Value, nil)
event.Operation = ResourceOperation_CREATED
if event.Status != nil {
return event, nil
}
// Make sure the created by user is accurate
//----------------------------------------
val := event.Object.GetCreatedBy()
if val != "" && val != event.Requester.GetUID().String() {
return event.BadRequest(nil, "created by annotation does not match: metadata.annotations#"+utils.AnnoKeyCreatedBy), nil
}
// Create can not have updated properties
//----------------------------------------
if event.Object.GetUpdatedBy() != "" {
return event.BadRequest(nil, "unexpected metadata.annotations#"+utils.AnnoKeyCreatedBy), nil
}
ts, err := event.Object.GetUpdatedTimestamp()
if err != nil {
return event.BadRequest(nil, fmt.Sprintf("invalid timestamp: %s", err)), nil
}
if ts != nil {
return event.BadRequest(nil, "unexpected metadata.annotations#"+utils.AnnoKeyUpdatedTimestamp), nil
}
return event, nil
}
func (v *eventValidator) PrepareUpdate(ctx context.Context, req *UpdateRequest, current []byte) (*WriteEvent, error) {
event := v.newEvent(ctx, req.Key, req.Value, current)
event.Operation = ResourceOperation_UPDATED
if event.Status != nil {
return event, nil
}
// Make sure the update user is accurate
//----------------------------------------
val := event.Object.GetUpdatedBy()
if val != "" && val != event.Requester.GetUID().String() {
return event.BadRequest(nil, "created by annotation does not match: metadata.annotations#"+utils.AnnoKeyUpdatedBy), nil
}
return event, nil
}

View File

@ -0,0 +1,50 @@
package resource
import (
"bytes"
"fmt"
)
// NamespacedPath is a path that can be used to isolate tenant data
// NOTE: this strategy does not allow quickly searching across namespace boundaries with a prefix
func (x *Key) NamespacedPath() string {
var buffer bytes.Buffer
if x.Namespace == "" {
buffer.WriteString("__cluster__")
} else {
buffer.WriteString(x.Namespace)
}
if x.Group == "" {
return buffer.String()
}
buffer.WriteString("/")
buffer.WriteString(x.Group)
if x.Resource == "" {
return buffer.String()
}
buffer.WriteString("/")
buffer.WriteString(x.Resource)
if x.Name == "" {
return buffer.String()
}
buffer.WriteString("/")
buffer.WriteString(x.Name)
if x.ResourceVersion > 0 {
buffer.WriteString("/")
buffer.WriteString(fmt.Sprintf("%.20d", x.ResourceVersion))
}
return buffer.String()
}
// Return a copy without the resource version
func (x *Key) WithoutResourceVersion() *Key {
return &Key{
Namespace: x.Namespace,
Group: x.Group,
Resource: x.Resource,
Name: x.Name,
}
}

View File

@ -0,0 +1,31 @@
package resource_test
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/services/store/resource"
)
func TestResourceModels(t *testing.T) {
t.Run("key namespaced path", func(t *testing.T) {
key := &resource.Key{}
require.Equal(t, "__cluster__", key.NamespacedPath())
key.Namespace = "ns"
require.Equal(t, "ns", key.NamespacedPath())
key.Group = "ggg"
require.Equal(t, "ns/ggg", key.NamespacedPath())
key.Resource = "rrr"
require.Equal(t, "ns/ggg/rrr", key.NamespacedPath())
key.Name = "nnnn"
require.Equal(t, "ns/ggg/rrr/nnnn", key.NamespacedPath())
key.ResourceVersion = 1234
require.Equal(t, "ns/ggg/rrr/nnnn/00000000000000001234", key.NamespacedPath())
})
}

View File

@ -0,0 +1,62 @@
package sqlstash
import (
"context"
"embed"
"encoding/json"
"testing"
"github.com/stretchr/testify/require"
playlist "github.com/grafana/grafana/pkg/apis/playlist/v0alpha1"
"github.com/grafana/grafana/pkg/infra/appcontext"
"github.com/grafana/grafana/pkg/models/roletype"
"github.com/grafana/grafana/pkg/services/store/resource"
"github.com/grafana/grafana/pkg/services/user"
)
func TestSQLCommands(t *testing.T) {
ctx := appcontext.WithUser(context.Background(), &user.SignedInUser{
UserID: 123,
UserUID: "u123",
OrgRole: roletype.RoleAdmin,
})
validator := resource.NewEventValidator(resource.EventValidatorOptions{
// no folders for now
})
t.Run("insert playlist SQL", func(t *testing.T) {
input := testdataFromJSON(t, "01_create_playlist.json", &playlist.Playlist{})
key, err := resource.ObjectKey(playlist.PlaylistResourceInfo.GroupResource(), input)
require.NoError(t, err)
req := &resource.CreateRequest{Key: key, Message: "test commit"}
req.Value, err = json.Marshal(input)
require.NoError(t, err)
require.Equal(t, "default/playlist.grafana.app/playlists/fdgsv37qslr0ga", key.NamespacedPath())
evt, err := validator.PrepareCreate(ctx, req)
require.NoError(t, err)
require.NoError(t, evt.Error)
require.Nil(t, evt.Error)
})
}
//go:embed testdata/*
var testdataFS embed.FS
func testdata(t *testing.T, filename string) []byte {
t.Helper()
b, err := testdataFS.ReadFile(`testdata/` + filename)
require.NoError(t, err)
return b
}
func testdataFromJSON[T any](t *testing.T, filename string, dest T) T {
t.Helper()
b := testdata(t, filename)
err := json.Unmarshal(b, dest)
require.NoError(t, err)
return dest
}

View File

@ -72,7 +72,7 @@ type sqlResourceServer struct {
cancel context.CancelFunc
stream chan *resource.WatchResponse
tracer trace.Tracer
validator resource.RequestValidator
validator resource.EventValidator
once sync.Once
initErr error
@ -138,7 +138,9 @@ func (s *sqlResourceServer) init() error {
s.sess = sess
s.dialect = migrator.NewDialect(engine.DriverName())
s.validator = resource.NewSimpleValidator()
s.validator = resource.NewEventValidator(resource.EventValidatorOptions{
// use snowflake IDs
})
// set up the broadcaster
s.broadcaster, err = sqlstash.NewBroadcaster(s.ctx, func(stream chan *resource.WatchResponse) error {
@ -198,16 +200,25 @@ func (s *sqlResourceServer) Create(ctx context.Context, req *resource.CreateRequ
ctx, span := s.tracer.Start(ctx, "storage_server.Create")
defer span.End()
if req.Key.ResourceVersion > 0 {
return &resource.CreateResponse{
Status: badRequest("can not update a specific resource version"),
}, nil
}
if err := s.Init(); err != nil {
return nil, err
}
obj, status := s.validator.ValidateCreate(ctx, req)
if status != nil {
return &resource.CreateResponse{Status: status}, nil
event, err := s.validator.PrepareCreate(ctx, req)
if err != nil {
return nil, err
}
if event.Status != nil {
return &resource.CreateResponse{Status: event.Status}, nil
}
fmt.Printf("TODO, CREATE: %v", obj.GetName())
fmt.Printf("TODO, CREATE: %v", event)
return nil, ErrNotImplementedYet
}
@ -216,28 +227,42 @@ func (s *sqlResourceServer) Update(ctx context.Context, req *resource.UpdateRequ
ctx, span := s.tracer.Start(ctx, "storage_server.Update")
defer span.End()
if req.Key.ResourceVersion < 0 {
return &resource.UpdateResponse{
Status: badRequest("update must include the previous version"),
}, nil
}
if err := s.Init(); err != nil {
return nil, err
}
old, err := s.GetResource(ctx, &resource.GetResourceRequest{
Key: req.Key,
latest, err := s.GetResource(ctx, &resource.GetResourceRequest{
Key: req.Key.WithoutResourceVersion(),
})
if err != nil {
return nil, err
}
if old.Value == nil {
if latest.Value == nil {
return &resource.UpdateResponse{
Status: badRequest("existing value does not exist"),
}, nil
}
obj, status := s.validator.ValidateUpdate(ctx, req, old)
if status != nil {
return &resource.UpdateResponse{Status: status}, nil
if latest.ResourceVersion != req.Key.ResourceVersion {
return &resource.UpdateResponse{
Status: badRequest("not the latest resource version"),
}, nil
}
fmt.Printf("TODO, UPDATE: %+v", obj.GetName())
event, err := s.validator.PrepareUpdate(ctx, req, latest.Value)
if err != nil {
return nil, err
}
if event.Status != nil {
return &resource.UpdateResponse{Status: event.Status}, nil
}
fmt.Printf("TODO, UPDATE: %v", event)
return nil, ErrNotImplementedYet
}

View File

@ -0,0 +1,25 @@
{
"apiVersion": "playlist.grafana.app/v0alpha1",
"kind": "Playlist",
"metadata": {
"name": "fdgsv37qslr0ga",
"namespace": "default",
"annotations": {
"grafana.app/originName": "elsewhere",
"grafana.app/originPath": "path/to/item",
"grafana.app/originTimestamp": "2024-02-02T00:00:00Z"
},
"creationTimestamp": "2024-03-03T00:00:00Z",
"uid": "8tGrXJgGbFI0"
},
"spec": {
"title": "hello",
"interval": "5m",
"items": [
{
"type": "dashboard_by_uid",
"value": "vmie2cmWz"
}
]
}
}

View File

@ -0,0 +1,25 @@
{
"apiVersion": "playlist.grafana.app/v0alpha1",
"kind": "Playlist",
"metadata": {
"name": "fdgsv37qslr0ga",
"namespace": "default",
"annotations": {
"grafana.app/originName": "elsewhere",
"grafana.app/originPath": "path/to/item",
"grafana.app/originTimestamp": "2024-02-02T00:00:00Z"
},
"creationTimestamp": "2024-03-03T00:00:00Z",
"uid": "8tGrXJgGbFI0"
},
"spec": {
"title": "hello",
"interval": "5m",
"items": [
{
"type": "dashboard_by_uid",
"value": "vmie2cmWz"
}
]
}
}

View File

@ -2,13 +2,35 @@ package resource
import (
"fmt"
"net/http"
"strconv"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)
func badRequest(format string, a ...any) *StatusResult {
return &StatusResult{
Status: "Failure",
Message: fmt.Sprintf(format, a...),
Code: http.StatusBadRequest,
// ObjectKey creates a key for a given object
func ObjectKey(gr schema.GroupResource, obj metav1.Object) (*Key, error) {
if gr.Group == "" {
return nil, fmt.Errorf("missing group")
}
if gr.Resource == "" {
return nil, fmt.Errorf("missing resource")
}
if obj.GetName() == "" {
return nil, fmt.Errorf("object is missing name")
}
key := &Key{
Group: gr.Group,
Resource: gr.Resource,
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
}
if obj.GetResourceVersion() != "" {
var err error
key.ResourceVersion, err = strconv.ParseInt(obj.GetResourceVersion(), 10, 64)
if err != nil {
return nil, fmt.Errorf("storage requires numeric revision version %w", err)
}
}
return key, nil
}

View File

@ -1,173 +0,0 @@
package resource
import (
"context"
"encoding/json"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/grafana/grafana/pkg/infra/appcontext"
"github.com/grafana/grafana/pkg/services/apiserver/utils"
"github.com/grafana/grafana/pkg/services/auth/identity"
)
// Verify that all required fields are set, and the user has permission to set the common metadata fields
type RequestValidator interface {
ValidateCreate(ctx context.Context, req *CreateRequest) (utils.GrafanaMetaAccessor, *StatusResult)
ValidateUpdate(ctx context.Context, req *UpdateRequest, current *GetResourceResponse) (utils.GrafanaMetaAccessor, *StatusResult)
}
type simpleValidator struct {
folderAccess func(ctx context.Context, user identity.Requester, uid string) bool
originAccess func(ctx context.Context, user identity.Requester, origin string) bool
}
func NewSimpleValidator() RequestValidator {
return &simpleValidator{
// folderAccess: func(ctx context.Context, user identity.Requester, uid string) bool {
// return true // for now you can right anything to any folder
// },
}
}
type dummyObject struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
}
var _ RequestValidator = &simpleValidator{}
func readValue(ctx context.Context, key *Key, value []byte) (identity.Requester, utils.GrafanaMetaAccessor, *StatusResult) {
// TODO -- we just need Identity not a full user!
user, err := appcontext.User(ctx)
if err != nil {
return nil, nil, badRequest(fmt.Sprintf("unable to get user // %s", err))
}
dummy := &dummyObject{}
err = json.Unmarshal(value, dummy)
if err != nil {
return nil, nil, badRequest(fmt.Sprintf("error reading json // %s", err))
}
obj, err := utils.MetaAccessor(dummy)
if err != nil {
return user, obj, badRequest(fmt.Sprintf("invalid object // %s", err))
}
if obj.GetName() != key.Name {
return user, obj, badRequest("key name does not match the name in the body")
}
if obj.GetNamespace() != key.Namespace {
return user, obj, badRequest("key namespace does not match the namespace in the body")
}
if obj.GetKind() != key.Resource {
return user, obj, badRequest("key resource in the body does not match the key (%s != %s)", obj.GetKind(), key.Resource)
}
return user, obj, nil
}
// This is the validation that happens for both CREATE and UPDATE
func (v *simpleValidator) validate(ctx context.Context, user identity.Requester, obj utils.GrafanaMetaAccessor) (utils.GrafanaMetaAccessor, *StatusResult) {
// To avoid confusion, lets not include the resource version in the saved value
// This is a little weird, but it means there won't be confusion that the saved value
// is likely the previous resource version!
if obj.GetResourceVersion() != "" {
return obj, badRequest("do not save the resource version in the value")
}
// Make sure all common fields are populated
if obj.GetName() == "" {
return obj, badRequest("missing name")
}
if obj.GetAPIVersion() == "" {
return obj, badRequest("missing apiversion")
}
if obj.GetUID() == "" {
return obj, badRequest("the uid is not configured")
}
// Check folder access
folder := obj.GetFolder()
if folder != "" {
if v.folderAccess == nil {
return obj, badRequest("folder access not supported")
} else if !v.folderAccess(ctx, user, folder) {
return obj, badRequest("not allowed to write resource to folder")
}
}
// Make sure you can write values to this origin
origin, err := obj.GetOriginInfo()
if err != nil {
return nil, badRequest(fmt.Sprintf("error reading origin // %s", err))
}
if origin != nil && v.originAccess != nil && !v.originAccess(ctx, user, origin.Name) {
return obj, badRequest("not allowed to write values to this origin")
}
return obj, nil
}
func (v *simpleValidator) ValidateCreate(ctx context.Context, req *CreateRequest) (utils.GrafanaMetaAccessor, *StatusResult) {
user, obj, errstatus := readValue(ctx, req.Key, req.Value)
if errstatus != nil {
return nil, errstatus
}
if req.Key.ResourceVersion > 0 {
return obj, badRequest("create key must not include a resource version")
}
// Make sure the created by user is accurate
//----------------------------------------
val := obj.GetCreatedBy()
if val != "" && val != user.GetUID().String() {
return obj, badRequest("created by annotation does not match: metadata.annotations#" + utils.AnnoKeyCreatedBy)
}
// Create can not have updated properties
//----------------------------------------
if obj.GetUpdatedBy() != "" {
return obj, badRequest("unexpected metadata.annotations#" + utils.AnnoKeyCreatedBy)
}
ts, err := obj.GetUpdatedTimestamp()
if err != nil {
return obj, badRequest(fmt.Sprintf("invalid timestamp: %s", err))
}
if ts != nil {
return obj, badRequest("unexpected metadata.annotations#" + utils.AnnoKeyUpdatedTimestamp)
}
return v.validate(ctx, user, obj)
}
func (v *simpleValidator) ValidateUpdate(ctx context.Context, req *UpdateRequest, current *GetResourceResponse) (utils.GrafanaMetaAccessor, *StatusResult) {
user, obj, errstatus := readValue(ctx, req.Key, req.Value)
if errstatus != nil {
return nil, errstatus
}
if req.Key.ResourceVersion > 0 && req.Key.ResourceVersion != current.ResourceVersion {
return obj, badRequest("resource version does not match (optimistic locking)")
}
_, oldobj, errstatus := readValue(ctx, req.Key, current.Value)
if errstatus != nil {
return nil, errstatus
}
if obj.GetCreatedBy() != oldobj.GetCreatedBy() {
return obj, badRequest(utils.AnnoKeyCreatedBy + " value has changed")
}
if obj.GetCreationTimestamp() != oldobj.GetCreationTimestamp() {
return obj, badRequest("creation time changed")
}
// Make sure the update user is accurate
//----------------------------------------
val := obj.GetUpdatedBy()
if val != "" && val != user.GetUID().String() {
return obj, badRequest("created by annotation does not match: metadata.annotations#" + utils.AnnoKeyUpdatedBy)
}
return v.validate(ctx, user, obj)
}