ResourceStore: remove the entity bridge (#90995)

This commit is contained in:
Georges Chaudy
2024-07-30 16:29:25 +02:00
committed by GitHub
parent 896f4889b6
commit 68dd311337
3 changed files with 0 additions and 366 deletions

View File

@@ -1,119 +0,0 @@
package entitybridge
import (
"errors"
"io"
"time"
grpcCodes "google.golang.org/grpc/codes"
grpcStatus "google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog/v2"
"github.com/grafana/grafana/pkg/apimachinery/utils"
entitystore "github.com/grafana/grafana/pkg/services/apiserver/storage/entity"
"github.com/grafana/grafana/pkg/services/store/entity"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
type decoder struct {
client entity.EntityStore_WatchClient
}
// Any errors will end the stream
func (d *decoder) next() (*resource.WrittenEvent, error) {
decode:
for {
err := d.client.Context().Err()
if err != nil {
klog.Errorf("client: context error: %s\n", err)
return nil, err
}
rsp, err := d.client.Recv()
if errors.Is(err, io.EOF) {
return nil, err
}
if grpcStatus.Code(err) == grpcCodes.Canceled {
return nil, err
}
if err != nil {
klog.Errorf("client: error receiving result: %s", err)
return nil, err
}
if rsp.Entity == nil {
klog.Errorf("client: received nil entity\n")
continue decode
}
event := resource.WriteEvent{
Key: &resource.ResourceKey{
Group: rsp.Entity.Namespace,
Resource: rsp.Entity.Resource,
Namespace: rsp.Entity.Namespace,
Name: rsp.Entity.Name,
},
}
switch rsp.Entity.Action {
case entity.Entity_CREATED:
event.Type = resource.WatchEvent_ADDED
case entity.Entity_UPDATED:
event.Type = resource.WatchEvent_MODIFIED
case entity.Entity_DELETED:
event.Type = resource.WatchEvent_DELETED
default:
klog.Errorf("unsupported action\n")
continue decode
}
// Now decode the bytes into an object
obj := &unstructured.Unstructured{}
err = entitystore.EntityToRuntimeObject(rsp.Entity, obj, unstructured.UnstructuredJSONScheme)
if err != nil {
klog.Errorf("error decoding entity: %s", err)
return nil, err
}
event.Value, err = obj.MarshalJSON()
if err != nil {
return nil, err
}
event.Object, err = utils.MetaAccessor(obj)
if err != nil {
return nil, err
}
// Decode the old value
if rsp.Previous != nil {
err = entitystore.EntityToRuntimeObject(rsp.Previous, obj, unstructured.UnstructuredJSONScheme)
if err != nil {
klog.Errorf("error decoding entity: %s", err)
return nil, err
}
event.ObjectOld, err = utils.MetaAccessor(obj)
if err != nil {
return nil, err
}
event.PreviousRV, err = event.ObjectOld.GetResourceVersionInt64()
if err != nil {
return nil, err
}
}
return &resource.WrittenEvent{
ResourceVersion: rsp.Entity.ResourceVersion,
Timestamp: time.Now().UnixMilli(),
WriteEvent: event,
}, nil
}
}
func (d *decoder) close() {
err := d.client.CloseSend()
if err != nil {
klog.Errorf("error closing watch stream: %s", err)
}
}

View File

@@ -1,5 +0,0 @@
// Package entitybridge implements an ResourceServer using existing EntityAPI contracts
//
// This package will be removed and replaced with a more streamlined SQL implementation
// that leverages what we have learned from the entity deployments so far
package entitybridge

View File

@@ -1,242 +0,0 @@
package entitybridge
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/klog/v2"
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/store/entity"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
// Creates a ResourceServer using the existing entity tables
// NOTE: the server is optional and only used to pass init+close functions
func EntityAsResourceServer(client entity.EntityStoreClient, server sqlstash.SqlEntityServer, tracer tracing.Tracer) (resource.ResourceServer, error) {
if client == nil {
return nil, fmt.Errorf("client must be defined")
}
// Use this bridge as the resource store
bridge := &entityBridge{
client: client,
server: server,
}
return resource.NewResourceServer(resource.ResourceServerOptions{
Tracer: tracer,
Backend: bridge,
Diagnostics: bridge,
Lifecycle: bridge,
})
}
// This is only created if we use the entity implementation
type entityBridge struct {
client entity.EntityStoreClient
// When running directly
// (we need the explicit version so we have access to init+stop)
server sqlstash.SqlEntityServer
}
// Init implements ResourceServer.
func (b *entityBridge) Init(context.Context) error {
if b.server != nil {
return b.server.Init()
}
return nil
}
// Stop implements ResourceServer.
func (b *entityBridge) Stop(context.Context) error {
if b.server != nil {
b.server.Stop()
}
return nil
}
// Convert resource key to the entity key
func toEntityKey(key *resource.ResourceKey) string {
e := grafanaregistry.Key{
Group: key.Group,
Resource: key.Resource,
Namespace: key.Namespace,
Name: key.Name,
}
return e.String()
}
func (b *entityBridge) WriteEvent(ctx context.Context, event resource.WriteEvent) (int64, error) {
key := toEntityKey(event.Key)
// Delete does not need to create an entity first
if event.Type == resource.WatchEvent_DELETED {
rsp, err := b.client.Delete(ctx, &entity.DeleteEntityRequest{
Key: key,
PreviousVersion: event.PreviousRV,
})
if err != nil {
return 0, err
}
return rsp.Entity.ResourceVersion, err
}
gvr := event.Object.GetGroupVersionKind()
obj := event.Object
msg := &entity.Entity{
Key: key,
Group: event.Key.Group,
Resource: event.Key.Resource,
Namespace: event.Key.Namespace,
Name: event.Key.Name,
Guid: string(event.Object.GetUID()),
GroupVersion: gvr.Version,
Folder: obj.GetFolder(),
Body: event.Value,
Message: event.Object.GetMessage(),
Labels: obj.GetLabels(),
Size: int64(len(event.Value)),
}
switch event.Type {
case resource.WatchEvent_ADDED:
msg.Action = entity.Entity_CREATED
rsp, err := b.client.Create(ctx, &entity.CreateEntityRequest{Entity: msg})
if err != nil {
return 0, err
}
return rsp.Entity.ResourceVersion, err
case resource.WatchEvent_MODIFIED:
msg.Action = entity.Entity_UPDATED
rsp, err := b.client.Update(ctx, &entity.UpdateEntityRequest{
Entity: msg,
PreviousVersion: event.PreviousRV,
})
if err != nil {
return 0, err
}
return rsp.Entity.ResourceVersion, err
default:
}
return 0, fmt.Errorf("unsupported operation: %s", event.Type.String())
}
func (b *entityBridge) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) {
client, err := b.client.Watch(ctx)
if err != nil {
return nil, err
}
req := &entity.EntityWatchRequest{
Action: entity.EntityWatchRequest_START,
Labels: map[string]string{},
WithBody: true,
WithStatus: true,
SendInitialEvents: false,
}
err = client.Send(req)
if err != nil {
err2 := client.CloseSend()
if err2 != nil {
klog.Errorf("watch close failed: %s\n", err2)
}
return nil, err
}
reader := &decoder{client}
stream := make(chan *resource.WrittenEvent, 10)
go func() {
for {
evt, err := reader.next()
if err != nil {
reader.close()
close(stream)
return
}
stream <- evt
}
}()
return stream, nil
}
// IsHealthy implements ResourceServer.
func (b *entityBridge) IsHealthy(ctx context.Context, req *resource.HealthCheckRequest) (*resource.HealthCheckResponse, error) {
rsp, err := b.client.IsHealthy(ctx, &entity.HealthCheckRequest{
Service: req.Service, // ??
})
if err != nil {
return nil, err
}
return &resource.HealthCheckResponse{
Status: resource.HealthCheckResponse_ServingStatus(rsp.Status),
}, nil
}
// Read implements ResourceServer.
func (b *entityBridge) ReadResource(ctx context.Context, req *resource.ReadRequest) *resource.ReadResponse {
v, err := b.client.Read(ctx, &entity.ReadEntityRequest{
Key: toEntityKey(req.Key),
WithBody: true,
})
if err != nil {
return &resource.ReadResponse{
Error: resource.AsErrorResult(err),
}
}
return &resource.ReadResponse{
ResourceVersion: v.ResourceVersion,
Value: v.Body,
}
}
// List implements ResourceServer.
func (b *entityBridge) PrepareList(ctx context.Context, req *resource.ListRequest) *resource.ListResponse {
key := req.Options.Key
query := &entity.EntityListRequest{
NextPageToken: req.NextPageToken,
Limit: req.Limit,
Key: []string{toEntityKey(key)},
WithBody: true,
}
if len(req.Options.Labels) > 0 {
query.Labels = make(map[string]string)
for _, q := range req.Options.Labels {
// The entity structure only supports equals
// the rest will be processed handled by the upstream predicate
op := selection.Operator(q.Operator)
if op == selection.Equals || op == selection.DoubleEquals {
query.Labels[q.Key] = q.Values[0]
}
}
}
found, err := b.client.List(ctx, query)
if err != nil {
return &resource.ListResponse{
Error: resource.AsErrorResult(err),
}
}
rsp := &resource.ListResponse{
ResourceVersion: found.ResourceVersion,
NextPageToken: found.NextPageToken,
}
for _, item := range found.Results {
rsp.Items = append(rsp.Items, &resource.ResourceWrapper{
ResourceVersion: item.ResourceVersion,
Value: item.Body,
})
}
return rsp
}