entity bridge

This commit is contained in:
Ryan McKinley 2024-06-14 22:53:10 +03:00
parent d5f3038421
commit d8abf8d851
13 changed files with 215 additions and 24 deletions

View File

@ -941,6 +941,7 @@ github.com/grafana/grafana-plugin-sdk-go v0.228.0/go.mod h1:u4K9vVN6eU86loO68977
github.com/grafana/grafana-plugin-sdk-go v0.229.0/go.mod h1:6V6ikT4ryva8MrAp7Bdz5fTJx3/ztzKvpMJFfpzr4CI=
github.com/grafana/grafana-plugin-sdk-go v0.231.1-0.20240523124942-62dae9836284/go.mod h1:bNgmNmub1I7Mc8dzIncgNqHC5jTgSZPPHlZ3aG8HKJQ=
github.com/grafana/grafana-plugin-sdk-go v0.234.0/go.mod h1:FlXjmBESxaD6Hoi8ojWLkH007nyjtJM3XC8SpwzF/YE=
github.com/grafana/grafana/pkg/apimachinery v0.0.0-20240613114114-5e2f08de316d/go.mod h1:adT8O7k6ZSzUKjAC4WS6VfWlCE4G1VavPwSXVhvScCs=
github.com/grafana/grafana/pkg/promlib v0.0.3/go.mod h1:3El4NlsfALz8QQCbEGHGFvJUG+538QLMuALRhZ3pcoo=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
@ -949,6 +950,9 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1/go.mod h1:YvJ2f6MplWDhfxiUC3Kp
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1/go.mod h1:5SN9VR2LTsRFsrEC6FHgRbTWrTHu6tqPeKxEQv15giM=
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 h1:MJG/KsmcqMwFAkh8mTnAwhyKoB+sTAnY4CACC110tbU=
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw=
github.com/hack-pad/go-indexeddb v0.3.2/go.mod h1:QvfTevpDVlkfomY498LhstjwbPW6QC4VC/lxYb0Kom0=
github.com/hack-pad/hackpadfs v0.2.1/go.mod h1:khQBuCEwGXWakkmq8ZiFUvUZz84ZkJ2KNwKvChs4OrU=
github.com/hack-pad/safejs v0.1.0/go.mod h1:HdS+bKF1NrE72VoXZeWzxFOVQVUSqZJAG0xNCnb+Tio=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
github.com/hamba/avro/v2 v2.17.2 h1:6PKpEWzJfNnvBgn7m2/8WYaDOUASxfDU+Jyb4ojDgFY=
github.com/hamba/avro/v2 v2.17.2/go.mod h1:Q9YK+qxAhtVrNqOhwlZTATLgLA8qxG2vtvkhK8fJ7Jo=

View File

@ -0,0 +1,200 @@
package sqlstash
import (
"context"
"fmt"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/store/entity"
"github.com/grafana/grafana/pkg/services/store/entity/db"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
// Creates a ResourceServer using the existing entity tables
// NOTE: most of the field values are ignored
func ProvideEnityStoreResources(db db.EntityDBInterface, tracer tracing.Tracer) (resource.ResourceServer, error) {
entity, err := sqlstash.ProvideSQLEntityServer(db, tracer)
if err != nil {
return nil, err
}
store := &entityBridge{
entity: entity,
}
return resource.NewResourceServer(resource.ResourceServerOptions{
Tracer: tracer,
Store: store,
NodeID: 234, // from config? used for snowflake ID
Diagnostics: store,
Lifecycle: store,
})
}
type entityBridge struct {
entity sqlstash.SqlEntityServer
}
// Init implements ResourceServer.
func (b *entityBridge) Init() error {
return b.entity.Init()
}
// Stop implements ResourceServer.
func (b *entityBridge) Stop() {
b.entity.Stop()
}
// Convert resource key to the entity key
func toEntityKey(key *resource.ResourceKey) string {
e := entity.Key{
Group: key.Group,
Resource: key.Resource,
Namespace: key.Namespace,
Name: key.Name,
}
return e.String()
}
func (b *entityBridge) WriteEvent(ctx context.Context, event *resource.WriteEvent) (int64, error) {
key := toEntityKey(event.Key)
// Delete does not need to create an entity first
if event.Operation == resource.ResourceOperation_DELETED {
rsp, err := b.entity.Delete(ctx, &entity.DeleteEntityRequest{
Key: key,
PreviousVersion: event.PreviousRV,
})
if err != nil {
return 0, err
}
return rsp.Entity.ResourceVersion, err
}
obj := event.Object
msg := &entity.Entity{
Key: key,
Group: event.Key.Group,
Resource: event.Key.Resource,
Namespace: event.Key.Namespace,
Name: event.Key.Name,
Guid: string(event.Object.GetUID()),
// Key: fmt.Sprint("%s/%s/%s/%s", ),
Folder: obj.GetFolder(),
Body: event.Value,
Message: event.Message,
Labels: obj.GetLabels(),
Size: int64(len(event.Value)),
}
switch event.Operation {
case resource.ResourceOperation_CREATED:
msg.Action = entity.Entity_CREATED
rsp, err := b.entity.Create(ctx, &entity.CreateEntityRequest{Entity: msg})
if err != nil {
return 0, err
}
return rsp.Entity.ResourceVersion, err
case resource.ResourceOperation_UPDATED:
msg.Action = entity.Entity_UPDATED
rsp, err := b.entity.Update(ctx, &entity.UpdateEntityRequest{
Entity: msg,
PreviousVersion: event.PreviousRV,
})
if err != nil {
return 0, err
}
return rsp.Entity.ResourceVersion, err
}
return 0, fmt.Errorf("unsupported operation: %s", event.Operation.String())
}
func (b *entityBridge) Watch(ctx context.Context, req *resource.WatchRequest) (chan *resource.WatchEvent, error) {
return nil, resource.ErrNotImplementedYet
}
// IsHealthy implements ResourceServer.
func (b *entityBridge) IsHealthy(ctx context.Context, req *resource.HealthCheckRequest) (*resource.HealthCheckResponse, error) {
rsp, err := b.entity.IsHealthy(ctx, &entity.HealthCheckRequest{
Service: req.Service, // ??
})
if err != nil {
return nil, err
}
return &resource.HealthCheckResponse{
Status: resource.HealthCheckResponse_ServingStatus(rsp.Status),
}, nil
}
// Read implements ResourceServer.
func (b *entityBridge) Read(ctx context.Context, req *resource.ReadRequest) (*resource.ReadResponse, error) {
v, err := b.entity.Read(ctx, &entity.ReadEntityRequest{
Key: toEntityKey(req.Key),
WithBody: true,
})
if err != nil {
return nil, err
}
return &resource.ReadResponse{
ResourceVersion: v.ResourceVersion,
Value: v.Body,
Message: v.Message,
}, nil
}
// List implements ResourceServer.
func (b *entityBridge) List(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) {
key := req.Options.Key
query := &entity.EntityListRequest{
NextPageToken: req.NextPageToken,
Limit: req.Limit,
Key: []string{toEntityKey(key)},
WithBody: true,
}
// Assumes everything is equals
if len(req.Options.Labels) > 0 {
query.Labels = make(map[string]string)
for _, q := range req.Options.Labels {
query.Labels[q.Key] = q.Values[0]
}
}
found, err := b.entity.List(ctx, query)
if err != nil {
return nil, err
}
rsp := &resource.ListResponse{
ResourceVersion: found.ResourceVersion,
NextPageToken: found.NextPageToken,
}
for _, item := range found.Results {
rsp.Items = append(rsp.Items, &resource.ResourceWrapper{
ResourceVersion: item.ResourceVersion,
Value: item.Body,
Operation: resource.ResourceOperation(item.Action),
})
}
return rsp, nil
}
// GetBlob implements ResourceServer.
func (b *entityBridge) GetBlob(context.Context, *resource.GetBlobRequest) (*resource.GetBlobResponse, error) {
return nil, resource.ErrNotImplementedYet
}
// History implements ResourceServer.
func (b *entityBridge) History(context.Context, *resource.HistoryRequest) (*resource.HistoryResponse, error) {
return nil, resource.ErrNotImplementedYet
}
// Origin implements ResourceServer.
func (b *entityBridge) Origin(context.Context, *resource.OriginRequest) (*resource.OriginResponse, error) {
return nil, resource.ErrNotImplementedYet
}

View File

@ -442,6 +442,15 @@ func (s *server) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, err
return nil, err
}
if req.Key.Group == "" {
status, _ := errToStatus(apierrors.NewBadRequest("missing group"))
return &ReadResponse{Status: status}, nil
}
if req.Key.Resource == "" {
status, _ := errToStatus(apierrors.NewBadRequest("missing resource"))
return &ReadResponse{Status: status}, nil
}
rsp, err := s.store.Read(ctx, req)
if err != nil {
if rsp == nil {

View File

@ -1,4 +1,4 @@
package sqlstash
package sqlnext
import (
"context"
@ -152,13 +152,6 @@ func (s *sqlResourceStore) Read(ctx context.Context, req *resource.ReadRequest)
_, span := s.tracer.Start(ctx, "storage_server.GetResource")
defer span.End()
if req.Key.Group == "" {
return &resource.ReadResponse{Status: badRequest("missing group")}, nil
}
if req.Key.Resource == "" {
return &resource.ReadResponse{Status: badRequest("missing resource")}, nil
}
fmt.Printf("TODO, GET: %+v", req.Key)
return nil, ErrNotImplementedYet

View File

@ -1,4 +1,4 @@
package sqlstash
package sqlnext
import (
"context"

View File

@ -1,15 +0,0 @@
package sqlstash
import (
"net/http"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
func badRequest(msg string) *resource.StatusResult {
return &resource.StatusResult{
Status: "Failure",
Message: msg,
Code: http.StatusBadRequest,
}
}