has a watch bridge... but not working

This commit is contained in:
Ryan McKinley 2024-06-21 14:53:23 +03:00
parent b771d38d78
commit ec925bab80
6 changed files with 188 additions and 20 deletions

View File

@ -100,7 +100,7 @@ func (s *Storage) Create(ctx context.Context, key string, obj runtime.Object, ou
return fmt.Errorf("this was not a create operation... (%s)", rsp.Status.String())
}
err = entityToResource(rsp.Entity, out, s.codec)
err = EntityToRuntimeObject(rsp.Entity, out, s.codec)
if err != nil {
return apierrors.NewInternalError(err)
}
@ -140,7 +140,7 @@ func (s *Storage) Delete(ctx context.Context, key string, out runtime.Object, pr
return err
}
err = entityToResource(rsp.Entity, out, s.codec)
err = EntityToRuntimeObject(rsp.Entity, out, s.codec)
if err != nil {
return apierrors.NewInternalError(err)
}
@ -328,7 +328,7 @@ func (s *Storage) Get(ctx context.Context, key string, opts storage.GetOptions,
return apierrors.NewNotFound(s.gr, k.Name)
}
err = entityToResource(rsp, objPtr, s.codec)
err = EntityToRuntimeObject(rsp, objPtr, s.codec)
if err != nil {
return apierrors.NewInternalError(err)
}
@ -394,7 +394,7 @@ func (s *Storage) GetList(ctx context.Context, key string, opts storage.ListOpti
for _, r := range rsp.Versions {
res := s.newFunc()
err := entityToResource(r, res, s.codec)
err := EntityToRuntimeObject(r, res, s.codec)
if err != nil {
return apierrors.NewInternalError(err)
}
@ -466,7 +466,7 @@ func (s *Storage) GetList(ctx context.Context, key string, opts storage.ListOpti
for _, r := range rsp.Results {
res := s.newFunc()
err := entityToResource(r, res, s.codec)
err := EntityToRuntimeObject(r, res, s.codec)
if err != nil {
return apierrors.NewInternalError(err)
}
@ -582,7 +582,7 @@ func (s *Storage) GuaranteedUpdate(
return err
}
err = entityToResource(rsp.Entity, destination, s.codec)
err = EntityToRuntimeObject(rsp.Entity, destination, s.codec)
if err != nil {
return apierrors.NewInternalError(err)
}
@ -605,7 +605,7 @@ func (s *Storage) GuaranteedUpdate(
return nil // destination is already set
}
err = entityToResource(rsp.Entity, destination, s.codec)
err = EntityToRuntimeObject(rsp.Entity, destination, s.codec)
if err != nil {
return apierrors.NewInternalError(err)
}
@ -676,7 +676,7 @@ decode:
return watch.Bookmark, obj, nil
}
err = entityToResource(resp.Entity, obj, d.codec)
err = EntityToRuntimeObject(resp.Entity, obj, d.codec)
if err != nil {
klog.Errorf("error decoding entity: %s", err)
return watch.Error, nil, err
@ -710,7 +710,7 @@ decode:
prevMatches := false
prevObj := d.newFunc()
if resp.Previous != nil {
err = entityToResource(resp.Previous, prevObj, d.codec)
err = EntityToRuntimeObject(resp.Previous, prevObj, d.codec)
if err != nil {
klog.Errorf("error decoding entity: %s", err)
return watch.Error, nil, err
@ -751,7 +751,7 @@ decode:
// if we have a previous object, return that in the deleted event
if resp.Previous != nil {
err = entityToResource(resp.Previous, obj, d.codec)
err = EntityToRuntimeObject(resp.Previous, obj, d.codec)
if err != nil {
klog.Errorf("error decoding entity: %s", err)
return watch.Error, nil, err

View File

@ -19,7 +19,7 @@ import (
entityStore "github.com/grafana/grafana/pkg/services/store/entity"
)
func entityToResource(rsp *entityStore.Entity, res runtime.Object, codec runtime.Codec) error {
func EntityToRuntimeObject(rsp *entityStore.Entity, res runtime.Object, codec runtime.Codec) error {
var err error
// Read the body first -- it includes old resourceVersion!

View File

@ -201,7 +201,7 @@ func TestEntityToResource(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.entity.Key+" to resource conversion should succeed", func(t *testing.T) {
var p v0alpha1.Playlist
err := entityToResource(tc.entity, &p, Codecs.LegacyCodec(v0alpha1.PlaylistResourceInfo.GroupVersion()))
err := EntityToRuntimeObject(tc.entity, &p, Codecs.LegacyCodec(v0alpha1.PlaylistResourceInfo.GroupVersion()))
require.NoError(t, err)
assert.Equal(t, tc.expectedApiVersion, p.TypeMeta.APIVersion)
assert.Equal(t, tc.expectedCreationTimestamp.Unix(), p.ObjectMeta.CreationTimestamp.Unix())

View File

@ -0,0 +1,119 @@
package entitybridge
import (
"errors"
"io"
"time"
grpcCodes "google.golang.org/grpc/codes"
grpcStatus "google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog/v2"
"github.com/grafana/grafana/pkg/apimachinery/utils"
entitystore "github.com/grafana/grafana/pkg/services/apiserver/storage/entity"
"github.com/grafana/grafana/pkg/services/store/entity"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
type decoder struct {
client entity.EntityStore_WatchClient
}
// Any errors will end the stream
func (d *decoder) next() (*resource.WrittenEvent, error) {
decode:
for {
err := d.client.Context().Err()
if err != nil {
klog.Errorf("client: context error: %s\n", err)
return nil, err
}
rsp, err := d.client.Recv()
if errors.Is(err, io.EOF) {
return nil, err
}
if grpcStatus.Code(err) == grpcCodes.Canceled {
return nil, err
}
if err != nil {
klog.Errorf("client: error receiving result: %s", err)
return nil, err
}
if rsp.Entity == nil {
klog.Errorf("client: received nil entity\n")
continue decode
}
event := resource.WriteEvent{
Key: &resource.ResourceKey{
Group: rsp.Entity.Namespace,
Resource: rsp.Entity.Resource,
Namespace: rsp.Entity.Namespace,
Name: rsp.Entity.Name,
},
}
switch rsp.Entity.Action {
case entity.Entity_CREATED:
event.Type = resource.WatchEvent_ADDED
case entity.Entity_UPDATED:
event.Type = resource.WatchEvent_MODIFIED
case entity.Entity_DELETED:
event.Type = resource.WatchEvent_DELETED
default:
klog.Errorf("unsupported action\n")
continue decode
}
// Now decode the bytes into an object
obj := &unstructured.Unstructured{}
err = entitystore.EntityToRuntimeObject(rsp.Entity, obj, unstructured.UnstructuredJSONScheme)
if err != nil {
klog.Errorf("error decoding entity: %s", err)
return nil, err
}
event.Value, err = obj.MarshalJSON()
if err != nil {
return nil, err
}
event.Object, err = utils.MetaAccessor(obj)
if err != nil {
return nil, err
}
// Decode the old value
if rsp.Previous != nil {
err = entitystore.EntityToRuntimeObject(rsp.Previous, obj, unstructured.UnstructuredJSONScheme)
if err != nil {
klog.Errorf("error decoding entity: %s", err)
return nil, err
}
event.ObjectOld, err = utils.MetaAccessor(obj)
if err != nil {
return nil, err
}
event.PreviousRV, err = event.ObjectOld.GetResourceVersionInt64()
if err != nil {
return nil, err
}
}
return &resource.WrittenEvent{
ResourceVersion: rsp.Entity.ResourceVersion,
Timestamp: time.Now().UnixMilli(),
WriteEvent: event,
}, nil
}
}
func (d *decoder) close() {
err := d.client.CloseSend()
if err != nil {
klog.Errorf("error closing watch stream: %s", err)
}
}

View File

@ -8,6 +8,7 @@ import (
"time"
"gocloud.dev/blob/fileblob"
"k8s.io/klog/v2"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/tracing"
@ -193,7 +194,42 @@ func (b *entityBridge) WriteEvent(ctx context.Context, event resource.WriteEvent
}
func (b *entityBridge) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) {
return nil, resource.ErrNotImplementedYet
client, err := b.client.Watch(ctx)
if err != nil {
return nil, err
}
req := &entity.EntityWatchRequest{
Action: entity.EntityWatchRequest_START,
Labels: map[string]string{},
WithBody: true,
WithStatus: true,
SendInitialEvents: false,
}
err = client.Send(req)
if err != nil {
err2 := client.CloseSend()
if err2 != nil {
klog.Errorf("watch close failed: %s\n", err2)
}
return nil, err
}
reader := &decoder{client}
stream := make(chan *resource.WrittenEvent, 10)
go func() {
for {
evt, err := reader.next()
if err != nil {
reader.close()
close(stream)
return
}
stream <- evt
}
}()
return stream, nil
}
// IsHealthy implements ResourceServer.

View File

@ -149,7 +149,13 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
}
// Make this cancelable
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(identity.WithRequester(context.Background(),
&identity.StaticRequester{
Namespace: identity.NamespaceServiceAccount,
Login: "watcher", // admin user for watch
UserID: 1,
IsGrafanaAdmin: true,
}))
return &server{
tracer: opts.Tracer,
log: slog.Default().With("logger", "resource-server"),
@ -179,7 +185,7 @@ type server struct {
lifecycle LifecycleHooks
now func() int64
// Background watch task
// Background watch task -- this has permissions for everything
ctx context.Context
cancel context.CancelFunc
broadcaster Broadcaster[*WrittenEvent]
@ -200,8 +206,14 @@ func (s *server) Init() error {
}
}
// Start listening for changes
s.initWatcher()
// Start watching for changes
if s.initErr == nil {
s.initErr = s.initWatcher()
}
if s.initErr != nil {
s.log.Error("error initializing resource server", "error", s.initErr)
}
})
return s.initErr
}
@ -579,9 +591,10 @@ func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error {
if event.ResourceVersion > since && matchesQueryKey(req.Options.Key, event.Key) {
// Currently sending *every* event
if req.Options.Labels != nil {
// match *either* the old or new object
}
// if req.Options.Labels != nil {
// // match *either* the old or new object
// }
// TODO: return values that match either the old or the new
srv.Send(&WatchEvent{
Timestamp: event.Timestamp,