ResourceServer: Add resource server protobuf and wrapper (#90007)

This commit is contained in:
Ryan McKinley 2024-07-09 15:08:13 -07:00 committed by GitHub
parent 05ce16cf7b
commit 079f0715aa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
40 changed files with 5852 additions and 62 deletions

View File

@ -386,6 +386,7 @@ protobuf: ## Compile protobuf definitions
buf generate pkg/plugins/backendplugin/pluginextensionv2 --template pkg/plugins/backendplugin/pluginextensionv2/buf.gen.yaml
buf generate pkg/plugins/backendplugin/secretsmanagerplugin --template pkg/plugins/backendplugin/secretsmanagerplugin/buf.gen.yaml
buf generate pkg/services/store/entity --template pkg/services/store/entity/buf.gen.yaml
buf generate pkg/storage/unified/resource --template pkg/storage/unified/resource/buf.gen.yaml
.PHONY: clean
clean: ## Clean up intermediate build artifacts.

View File

@ -27,7 +27,6 @@ cloud.google.com/go/channel v1.17.5 h1:/omiBnyFjm4S1ETHoOmJbL7LH7Ljcei4rYG6Sj3hc
cloud.google.com/go/cloudbuild v1.15.1 h1:ZB6oOmJo+MTov9n629fiCrO9YZPOg25FZvQ7gIHu5ng=
cloud.google.com/go/clouddms v1.7.4 h1:Sr0Zo5EAcPQiCBgHWICg3VGkcdS/LLP1d9SR7qQBM/s=
cloud.google.com/go/cloudtasks v1.12.6 h1:EUt1hIZ9bLv8Iz9yWaCrqgMnIU+Tdh0yXM1MMVGhjfE=
cloud.google.com/go/compute v1.25.1 h1:ZRpHJedLtTpKgr3RV1Fx23NuaAEN1Zfx9hw1u4aJdjU=
cloud.google.com/go/compute v1.25.1/go.mod h1:oopOIR53ly6viBYxaDhBfJwzUAxf1zE//uf3IB011ls=
cloud.google.com/go/contactcenterinsights v1.13.0 h1:6Vs/YnDG5STGjlWMEjN/xtmft7MrOTOnOZYUZtGTx0w=
cloud.google.com/go/container v1.31.0 h1:MAaNH7VRNPWEhvqOypq2j+7ONJKrKzon4v9nS3nLZe0=
@ -854,11 +853,13 @@ go.opentelemetry.io/collector/service v0.95.0/go.mod h1:4yappQmDE5UZmLE9wwtj6IPM
go.opentelemetry.io/contrib/config v0.4.0 h1:Xb+ncYOqseLroMuBesGNRgVQolXcXOhMj7EhGwJCdHs=
go.opentelemetry.io/contrib/config v0.4.0/go.mod h1:drNk2xRqLWW4/amk6Uh1S+sDAJTc7bcEEN1GfJzj418=
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.51.0/go.mod h1:ZvX/taFlN6TGaOOM6D42wrNwPKUV1nGO2FuUXkityBU=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0/go.mod h1:vy+2G/6NvVMpwGX/NyLqcC41fxepnuKHk16E6IZUcJc=
go.opentelemetry.io/contrib/propagators/b3 v1.15.0 h1:bMaonPyFcAvZ4EVzkUNkfnUHP5Zi63CIDlA3dRsEg8Q=
go.opentelemetry.io/contrib/propagators/b3 v1.15.0/go.mod h1:VjU0g2v6HSQ+NwfifambSLAeBgevjIcqmceaKWEzl0c=
go.opentelemetry.io/contrib/propagators/b3 v1.23.0 h1:aaIGWc5JdfRGpCafLRxMJbD65MfTa206AwSKkvGS0Hg=
go.opentelemetry.io/contrib/propagators/b3 v1.23.0/go.mod h1:Gyz7V7XghvwTq+mIhLFlTgcc03UDroOg8vezs4NLhwU=
go.opentelemetry.io/contrib/propagators/jaeger v1.26.0/go.mod h1:W/cylm0ZtJK1uxsuTqoYGYPnqpZ8CeVGgW7TwfXPsGw=
go.opentelemetry.io/otel v1.26.0/go.mod h1:UmLkJHUAidDval2EICqBMbnAd0/m2vmpf/dAM+fvFs4=
go.opentelemetry.io/otel/bridge/opencensus v0.37.0 h1:ieH3gw7b1eg90ARsFAlAsX5LKVZgnCYfaDwRrK6xLHU=
go.opentelemetry.io/otel/bridge/opencensus v0.37.0/go.mod h1:ddiK+1PE68l/Xk04BGTh9Y6WIcxcLrmcVxVlS0w5WZ0=
go.opentelemetry.io/otel/bridge/opencensus v1.26.0 h1:DZzxj9QjznMVoehskOJnFP2gsTCWtDTFBDvFhPAY7nc=
@ -883,6 +884,7 @@ go.opentelemetry.io/otel/sdk/metric v0.39.0 h1:Kun8i1eYf48kHH83RucG93ffz0zGV1sh4
go.opentelemetry.io/otel/sdk/metric v0.39.0/go.mod h1:piDIRgjcK7u0HCL5pCA4e74qpK/jk3NiUoAHATVAmiI=
go.opentelemetry.io/otel/sdk/metric v1.26.0 h1:cWSks5tfriHPdWFnl+qpX3P681aAYqlZHcAyHw5aU9Y=
go.opentelemetry.io/otel/sdk/metric v1.26.0/go.mod h1:ClMFFknnThJCksebJwz7KIyEDHO+nTB6gK8obLy8RyE=
go.opentelemetry.io/otel/trace v1.26.0/go.mod h1:4iDxvGDQuUkHve82hJJ8UqrwswHYsZuWCBllGV2U2y0=
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
golang.org/x/arch v0.4.0 h1:A8WCeEWhLwPBKNbFi5Wv5UTCBx5zzubnXDlMOFAzFMc=
@ -899,7 +901,10 @@ gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0 h1:OE9mWmgKkjJyEmDAAtGMPj
gonum.org/v1/plot v0.10.1 h1:dnifSs43YJuNMDzB7v8wV64O4ABBHReuAVAoBxqBqS4=
google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20240325203815-454cdb8f5daa h1:wBkzraZsSqhj1M4L/nMrljUU6XasJkgHvUsq8oRGwF0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0 h1:M1YKkFIboKNieVO5DLUEVzQfGwJD30Nv2jfUgzb5UcE=
google.golang.org/protobuf v1.34.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/cheggaaa/pb.v1 v1.0.25 h1:Ev7yu1/f6+d+b3pi5vPdRPc6nNtP1umSfcWiEfRqv6I=
gopkg.in/errgo.v2 v2.1.0 h1:0vLT13EuvQ0hNvakwLuFZ/jYrLp5F3kcWHXdRggjCE8=

View File

@ -1,9 +1,12 @@
package utils
import (
"bytes"
"fmt"
"mime"
"reflect"
"strconv"
"strings"
"time"
"k8s.io/apimachinery/pkg/api/meta"
@ -20,6 +23,8 @@ const AnnoKeyUpdatedTimestamp = "grafana.app/updatedTimestamp"
const AnnoKeyUpdatedBy = "grafana.app/updatedBy"
const AnnoKeyFolder = "grafana.app/folder"
const AnnoKeySlug = "grafana.app/slug"
const AnnoKeyBlob = "grafana.app/blob"
const AnnoKeyMessage = "grafana.app/message"
// Identify where values came from
@ -53,6 +58,7 @@ type GrafanaMetaAccessor interface {
metav1.Object
GetGroupVersionKind() schema.GroupVersionKind
GetRuntimeObject() (runtime.Object, bool)
// Helper to get resource versions as int64, however this is not required
// See: https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions
@ -68,11 +74,16 @@ type GrafanaMetaAccessor interface {
SetUpdatedBy(user string)
GetFolder() string
SetFolder(uid string)
GetMessage() string
SetMessage(msg string)
SetAnnotation(key string, val string)
GetSlug() string
SetSlug(v string)
SetBlob(v *BlobInfo)
GetBlob() *BlobInfo
GetOriginInfo() (*ResourceOriginInfo, error)
SetOriginInfo(info *ResourceOriginInfo)
GetOriginName() string
@ -80,6 +91,8 @@ type GrafanaMetaAccessor interface {
GetOriginHash() string
GetOriginTimestamp() (*time.Time, error)
GetSpec() (any, error)
// Find a title in the object
// This will reflect the object and try to get:
// * spec.title
@ -123,6 +136,11 @@ func (m *grafanaMetaAccessor) GetResourceVersionInt64() (int64, error) {
return strconv.ParseInt(v, 10, 64)
}
func (m *grafanaMetaAccessor) GetRuntimeObject() (runtime.Object, bool) {
obj, ok := m.raw.(runtime.Object)
return obj, ok
}
func (m *grafanaMetaAccessor) SetResourceVersionInt64(rv int64) {
m.obj.SetResourceVersion(strconv.FormatInt(rv, 10))
}
@ -192,6 +210,17 @@ func (m *grafanaMetaAccessor) SetUpdatedBy(user string) {
m.SetAnnotation(AnnoKeyUpdatedBy, user)
}
func (m *grafanaMetaAccessor) GetBlob() *BlobInfo {
return ParseBlobInfo(m.get(AnnoKeyBlob))
}
func (m *grafanaMetaAccessor) SetBlob(info *BlobInfo) {
if info == nil {
m.SetAnnotation(AnnoKeyBlob, "") // delete
}
m.SetAnnotation(AnnoKeyBlob, info.String())
}
func (m *grafanaMetaAccessor) GetFolder() string {
return m.get(AnnoKeyFolder)
}
@ -200,6 +229,14 @@ func (m *grafanaMetaAccessor) SetFolder(uid string) {
m.SetAnnotation(AnnoKeyFolder, uid)
}
func (m *grafanaMetaAccessor) GetMessage() string {
return m.get(AnnoKeyMessage)
}
func (m *grafanaMetaAccessor) SetMessage(uid string) {
m.SetAnnotation(AnnoKeyMessage, uid)
}
func (m *grafanaMetaAccessor) GetSlug() string {
return m.get(AnnoKeySlug)
}
@ -457,6 +494,16 @@ func (m *grafanaMetaAccessor) GetGroupVersionKind() schema.GroupVersionKind {
return gvk
}
func (m *grafanaMetaAccessor) GetSpec() (spec any, err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("error reading spec")
}
}()
spec = m.r.FieldByName("Spec").Interface()
return
}
func (m *grafanaMetaAccessor) FindTitle(defaultTitle string) string {
// look for Spec.Title or Spec.Name
spec := m.r.FieldByName("Spec")
@ -477,3 +524,82 @@ func (m *grafanaMetaAccessor) FindTitle(defaultTitle string) string {
}
return defaultTitle
}
type BlobInfo struct {
UID string `json:"uid"`
Size int64 `json:"size,omitempty"`
Hash string `json:"hash,omitempty"`
MimeType string `json:"mime,omitempty"`
Charset string `json:"charset,omitempty"` // content type = mime+charset
}
// Content type is mime + charset
func (b *BlobInfo) SetContentType(v string) {
var params map[string]string
var err error
b.Charset = ""
b.MimeType, params, err = mime.ParseMediaType(v)
if err != nil {
return
}
b.Charset = params["charset"]
}
// Content type is mime + charset
func (b *BlobInfo) ContentType() string {
sb := bytes.NewBufferString(b.MimeType)
if b.Charset != "" {
sb.WriteString("; charset=")
sb.WriteString(b.Charset)
}
return sb.String()
}
func (b *BlobInfo) String() string {
sb := bytes.NewBufferString(b.UID)
if b.Size > 0 {
sb.WriteString(fmt.Sprintf("; size=%d", b.Size))
}
if b.Hash != "" {
sb.WriteString("; hash=")
sb.WriteString(b.Hash)
}
if b.MimeType != "" {
sb.WriteString("; mime=")
sb.WriteString(b.MimeType)
}
if b.Charset != "" {
sb.WriteString("; charset=")
sb.WriteString(b.Charset)
}
return sb.String()
}
func ParseBlobInfo(v string) *BlobInfo {
if v == "" {
return nil
}
info := &BlobInfo{}
for i, part := range strings.Split(v, ";") {
if i == 0 {
info.UID = part
continue
}
kv := strings.Split(strings.TrimSpace(part), "=")
if len(kv) == 2 {
val := kv[1]
switch kv[0] {
case "size":
info.Size, _ = strconv.ParseInt(val, 10, 64)
case "hash":
info.Hash = val
case "mime":
info.MimeType = val
case "charset":
info.Charset = val
}
}
}
return info
}

View File

@ -172,6 +172,14 @@ func TestMetaAccessor(t *testing.T) {
require.Equal(t, int64(12345), rv)
})
t.Run("blob info", func(t *testing.T) {
info := &utils.BlobInfo{UID: "AAA", Size: 123, Hash: "xyz", MimeType: "application/json", Charset: "utf-8"}
anno := info.String()
require.Equal(t, "AAA; size=123; hash=xyz; mime=application/json; charset=utf-8", anno)
copy := utils.ParseBlobInfo(anno)
require.Equal(t, info, copy)
})
t.Run("find titles", func(t *testing.T) {
// with a k8s object that has Spec.Title
obj := &TestResource{
@ -220,5 +228,13 @@ func TestMetaAccessor(t *testing.T) {
}, obj2.GetAnnotations())
require.Equal(t, "xxx", meta.FindTitle("xxx"))
rt, ok := meta.GetRuntimeObject()
require.Equal(t, obj2, rt)
require.True(t, ok)
spec, err := meta.GetSpec()
require.Equal(t, obj2.Spec, spec)
require.NoError(t, err)
})
}

View File

@ -4,20 +4,23 @@ import (
"fmt"
"net"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
"github.com/spf13/pflag"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/options"
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest"
)
type StorageType string
const (
StorageTypeFile StorageType = "file"
StorageTypeEtcd StorageType = "etcd"
StorageTypeLegacy StorageType = "legacy"
StorageTypeUnified StorageType = "unified"
StorageTypeUnifiedGrpc StorageType = "unified-grpc"
StorageTypeFile StorageType = "file"
StorageTypeEtcd StorageType = "etcd"
StorageTypeLegacy StorageType = "legacy"
StorageTypeUnified StorageType = "unified"
StorageTypeUnifiedGrpc StorageType = "unified-grpc"
StorageTypeUnifiedNext StorageType = "unified-next"
StorageTypeUnifiedNextGrpc StorageType = "unified-next-grpc"
)
type StorageOptions struct {
@ -43,10 +46,10 @@ func (o *StorageOptions) AddFlags(fs *pflag.FlagSet) {
func (o *StorageOptions) Validate() []error {
errs := []error{}
switch o.StorageType {
case StorageTypeFile, StorageTypeEtcd, StorageTypeLegacy, StorageTypeUnified, StorageTypeUnifiedGrpc:
case StorageTypeFile, StorageTypeEtcd, StorageTypeLegacy, StorageTypeUnified, StorageTypeUnifiedGrpc, StorageTypeUnifiedNext, StorageTypeUnifiedNextGrpc:
// no-op
default:
errs = append(errs, fmt.Errorf("--grafana-apiserver-storage-type must be one of %s, %s, %s, %s, %s", StorageTypeFile, StorageTypeEtcd, StorageTypeLegacy, StorageTypeUnified, StorageTypeUnifiedGrpc))
errs = append(errs, fmt.Errorf("--grafana-apiserver-storage-type must be one of %s, %s, %s, %s, %s, %s, %s", StorageTypeFile, StorageTypeEtcd, StorageTypeLegacy, StorageTypeUnified, StorageTypeUnifiedGrpc, StorageTypeUnifiedNext, StorageTypeUnifiedNextGrpc))
}
if _, _, err := net.SplitHostPort(o.Address); err != nil {

View File

@ -4,10 +4,13 @@ import (
"context"
"fmt"
"net/http"
"os"
"path"
"path/filepath"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"gocloud.dev/blob/fileblob"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -44,6 +47,9 @@ import (
"github.com/grafana/grafana/pkg/services/store/entity/db/dbimpl"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/apistore"
"github.com/grafana/grafana/pkg/storage/unified/entitybridge"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
var (
@ -199,6 +205,7 @@ func (s *service) RegisterAPI(b builder.APIGroupBuilder) {
s.builders = append(s.builders, b)
}
// nolint:gocyclo
func (s *service) start(ctx context.Context) error {
defer close(s.startedCh)
@ -258,39 +265,87 @@ func (s *service) start(ctx context.Context) error {
return err
}
case grafanaapiserveroptions.StorageTypeUnified:
case grafanaapiserveroptions.StorageTypeUnifiedNext:
// CDK (for now)
dir := filepath.Join(s.cfg.DataPath, "unistore", "resource")
if err := os.MkdirAll(dir, 0o750); err != nil {
return err
}
bucket, err := fileblob.OpenBucket(dir, &fileblob.Options{
CreateDir: true,
Metadata: fileblob.MetadataDontWrite, // skip
})
if err != nil {
return err
}
backend, err := resource.NewCDKBackend(context.Background(), resource.CDKBackendOptions{
Tracer: s.tracing,
Bucket: bucket,
})
if err != nil {
return err
}
server, err := resource.NewResourceServer(resource.ResourceServerOptions{Backend: backend})
if err != nil {
return err
}
serverConfig.Config.RESTOptionsGetter = apistore.NewRESTOptionsGetterForServer(server,
o.RecommendedOptions.Etcd.StorageConfig.Codec)
case grafanaapiserveroptions.StorageTypeUnifiedNextGrpc:
if !s.features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorage) {
return fmt.Errorf("unified storage requires the unifiedStorage feature flag")
}
eDB, err := dbimpl.ProvideEntityDB(s.db, s.cfg, s.features, s.tracing)
if err != nil {
return err
}
storeServer, err := sqlstash.ProvideSQLEntityServer(eDB, s.tracing)
if err != nil {
return err
}
store := entity.NewEntityStoreClientLocal(storeServer)
serverConfig.Config.RESTOptionsGetter = entitystorage.NewRESTOptionsGetter(s.cfg, store, o.RecommendedOptions.Etcd.StorageConfig.Codec)
case grafanaapiserveroptions.StorageTypeUnifiedGrpc:
// Create a connection to the gRPC server
conn, err := grpc.NewClient(o.StorageOptions.Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}
// TODO: determine when to close the connection, we cannot defer it here
// defer conn.Close()
// Create a client instance
store := entity.NewEntityStoreClientGRPC(conn)
client := resource.NewResourceStoreClientGRPC(conn)
serverConfig.Config.RESTOptionsGetter = apistore.NewRESTOptionsGetter(client, o.RecommendedOptions.Etcd.StorageConfig.Codec)
serverConfig.Config.RESTOptionsGetter = entitystorage.NewRESTOptionsGetter(s.cfg, store, o.RecommendedOptions.Etcd.StorageConfig.Codec)
case grafanaapiserveroptions.StorageTypeUnified, grafanaapiserveroptions.StorageTypeUnifiedGrpc:
var client entity.EntityStoreClient
var entityServer sqlstash.SqlEntityServer
if o.StorageOptions.StorageType == grafanaapiserveroptions.StorageTypeUnifiedGrpc {
conn, err := grpc.NewClient(o.StorageOptions.Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}
client = entity.NewEntityStoreClientGRPC(conn)
} else {
if !s.features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorage) {
return fmt.Errorf("unified storage requires the unifiedStorage feature flag")
}
eDB, err := dbimpl.ProvideEntityDB(s.db, s.cfg, s.features, s.tracing)
if err != nil {
return err
}
entityServer, err = sqlstash.ProvideSQLEntityServer(eDB, s.tracing)
if err != nil {
return err
}
client = entity.NewEntityStoreClientLocal(entityServer)
}
if false {
// Use the entity bridge
server, err := entitybridge.EntityAsResourceServer(client, entityServer, s.tracing)
if err != nil {
return err
}
serverConfig.Config.RESTOptionsGetter = apistore.NewRESTOptionsGetterForServer(server,
o.RecommendedOptions.Etcd.StorageConfig.Codec)
} else {
serverConfig.Config.RESTOptionsGetter = entitystorage.NewRESTOptionsGetter(s.cfg,
client, o.RecommendedOptions.Etcd.StorageConfig.Codec)
}
case grafanaapiserveroptions.StorageTypeLegacy:
fallthrough

View File

@ -1,6 +1,6 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.33.0
// protoc-gen-go v1.34.1
// protoc (unknown)
// source: entity.proto
@ -1401,7 +1401,7 @@ type EntityListRequest struct {
WithStatus bool `protobuf:"varint,10,opt,name=with_status,json=withStatus,proto3" json:"with_status,omitempty"`
// list deleted entities instead of active ones
Deleted bool `protobuf:"varint,12,opt,name=deleted,proto3" json:"deleted,omitempty"`
// Limit to a set of origin keys (empty is all)
// Deprecated: Limit to a set of origin keys (empty is all)
OriginKeys []string `protobuf:"bytes,13,rep,name=origin_keys,json=originKeys,proto3" json:"origin_keys,omitempty"`
}

View File

@ -1,6 +1,6 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc-gen-go-grpc v1.4.0
// - protoc (unknown)
// source: entity.proto
@ -15,8 +15,8 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// Requires gRPC-Go v1.62.0 or later.
const _ = grpc.SupportPackageIsVersion8
const (
EntityStore_Read_FullMethodName = "/entity.EntityStore/Read"
@ -32,6 +32,8 @@ const (
// EntityStoreClient is the client API for EntityStore service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
//
// The entity store provides a basic CRUD (+watch eventually) interface for generic entities
type EntityStoreClient interface {
Read(ctx context.Context, in *ReadEntityRequest, opts ...grpc.CallOption) (*Entity, error)
Create(ctx context.Context, in *CreateEntityRequest, opts ...grpc.CallOption) (*CreateEntityResponse, error)
@ -52,8 +54,9 @@ func NewEntityStoreClient(cc grpc.ClientConnInterface) EntityStoreClient {
}
func (c *entityStoreClient) Read(ctx context.Context, in *ReadEntityRequest, opts ...grpc.CallOption) (*Entity, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(Entity)
err := c.cc.Invoke(ctx, EntityStore_Read_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, EntityStore_Read_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
@ -61,8 +64,9 @@ func (c *entityStoreClient) Read(ctx context.Context, in *ReadEntityRequest, opt
}
func (c *entityStoreClient) Create(ctx context.Context, in *CreateEntityRequest, opts ...grpc.CallOption) (*CreateEntityResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(CreateEntityResponse)
err := c.cc.Invoke(ctx, EntityStore_Create_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, EntityStore_Create_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
@ -70,8 +74,9 @@ func (c *entityStoreClient) Create(ctx context.Context, in *CreateEntityRequest,
}
func (c *entityStoreClient) Update(ctx context.Context, in *UpdateEntityRequest, opts ...grpc.CallOption) (*UpdateEntityResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(UpdateEntityResponse)
err := c.cc.Invoke(ctx, EntityStore_Update_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, EntityStore_Update_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
@ -79,8 +84,9 @@ func (c *entityStoreClient) Update(ctx context.Context, in *UpdateEntityRequest,
}
func (c *entityStoreClient) Delete(ctx context.Context, in *DeleteEntityRequest, opts ...grpc.CallOption) (*DeleteEntityResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(DeleteEntityResponse)
err := c.cc.Invoke(ctx, EntityStore_Delete_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, EntityStore_Delete_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
@ -88,8 +94,9 @@ func (c *entityStoreClient) Delete(ctx context.Context, in *DeleteEntityRequest,
}
func (c *entityStoreClient) History(ctx context.Context, in *EntityHistoryRequest, opts ...grpc.CallOption) (*EntityHistoryResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(EntityHistoryResponse)
err := c.cc.Invoke(ctx, EntityStore_History_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, EntityStore_History_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
@ -97,8 +104,9 @@ func (c *entityStoreClient) History(ctx context.Context, in *EntityHistoryReques
}
func (c *entityStoreClient) List(ctx context.Context, in *EntityListRequest, opts ...grpc.CallOption) (*EntityListResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(EntityListResponse)
err := c.cc.Invoke(ctx, EntityStore_List_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, EntityStore_List_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
@ -106,11 +114,12 @@ func (c *entityStoreClient) List(ctx context.Context, in *EntityListRequest, opt
}
func (c *entityStoreClient) Watch(ctx context.Context, opts ...grpc.CallOption) (EntityStore_WatchClient, error) {
stream, err := c.cc.NewStream(ctx, &EntityStore_ServiceDesc.Streams[0], EntityStore_Watch_FullMethodName, opts...)
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &EntityStore_ServiceDesc.Streams[0], EntityStore_Watch_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &entityStoreWatchClient{stream}
x := &entityStoreWatchClient{ClientStream: stream}
return x, nil
}
@ -137,8 +146,9 @@ func (x *entityStoreWatchClient) Recv() (*EntityWatchResponse, error) {
}
func (c *entityStoreClient) IsHealthy(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(HealthCheckResponse)
err := c.cc.Invoke(ctx, EntityStore_IsHealthy_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, EntityStore_IsHealthy_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
@ -148,6 +158,8 @@ func (c *entityStoreClient) IsHealthy(ctx context.Context, in *HealthCheckReques
// EntityStoreServer is the server API for EntityStore service.
// All implementations should embed UnimplementedEntityStoreServer
// for forward compatibility
//
// The entity store provides a basic CRUD (+watch eventually) interface for generic entities
type EntityStoreServer interface {
Read(context.Context, *ReadEntityRequest) (*Entity, error)
Create(context.Context, *CreateEntityRequest) (*CreateEntityResponse, error)
@ -308,7 +320,7 @@ func _EntityStore_List_Handler(srv interface{}, ctx context.Context, dec func(in
}
func _EntityStore_Watch_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(EntityStoreServer).Watch(&entityStoreWatchServer{stream})
return srv.(EntityStoreServer).Watch(&entityStoreWatchServer{ServerStream: stream})
}
type EntityStore_WatchServer interface {

View File

@ -655,6 +655,7 @@ func (s *sqlEntityServer) List(ctx context.Context, r *entity.EntityListRequest)
rvSubQuery.AddWhere("("+strings.Join(where, " OR ")+")", args...)
}
// nolint:staticcheck
if len(r.OriginKeys) > 0 {
entityQuery.AddWhereIn("origin_key", ToAnyList(r.OriginKeys))
rvMaxQuery.AddWhereIn("origin_key", ToAnyList(r.OriginKeys))

View File

@ -0,0 +1,5 @@
// Package apistore provides a kubernetes store.Interface for a ResourceServer
//
// This package is responsible for running all the apiserver specific logic
// before and after sending requests to the StorageServer
package apistore

View File

@ -0,0 +1,87 @@
// SPDX-License-Identifier: AGPL-3.0-only
package apistore
import (
"path"
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/client-go/tools/cache"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
var _ generic.RESTOptionsGetter = (*RESTOptionsGetter)(nil)
type RESTOptionsGetter struct {
client resource.ResourceStoreClient
Codec runtime.Codec
}
func NewRESTOptionsGetterForServer(server resource.ResourceServer, codec runtime.Codec) *RESTOptionsGetter {
return &RESTOptionsGetter{
client: resource.NewLocalResourceStoreClient(server),
Codec: codec,
}
}
func NewRESTOptionsGetter(client resource.ResourceStoreClient, codec runtime.Codec) *RESTOptionsGetter {
return &RESTOptionsGetter{
client: client,
Codec: codec,
}
}
func (f *RESTOptionsGetter) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
storageConfig := &storagebackend.ConfigForResource{
Config: storagebackend.Config{
Type: "custom",
Prefix: "",
Transport: storagebackend.TransportConfig{
ServerList: []string{
// ??? string(connectionInfo),
},
},
Codec: f.Codec,
EncodeVersioner: nil,
Transformer: nil,
CompactionInterval: 0,
CountMetricPollPeriod: 0,
DBMetricPollInterval: 0,
HealthcheckTimeout: 0,
ReadycheckTimeout: 0,
StorageObjectCountTracker: nil,
},
GroupResource: resource,
}
ret := generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: func(
config *storagebackend.ConfigForResource,
resourcePrefix string,
keyFunc func(obj runtime.Object) (string, error),
newFunc func() runtime.Object,
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
trigger storage.IndexerFuncs,
indexers *cache.Indexers,
) (storage.Interface, factory.DestroyFunc, error) {
return NewStorage(config, resource, f.client, f.Codec, keyFunc, newFunc, newListFunc, getAttrsFunc)
},
DeleteCollectionWorkers: 0,
EnableGarbageCollection: false,
ResourcePrefix: path.Join(storageConfig.Prefix, resource.Group, resource.Resource),
CountMetricPollPeriod: 1 * time.Second,
StorageObjectCountTracker: flowcontrolrequest.NewStorageObjectCountTracker(),
}
return ret, nil
}

View File

@ -0,0 +1,531 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/kubernetes-sigs/apiserver-runtime/blob/main/pkg/experimental/storage/filepath/jsonfile_rest.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Kubernetes Authors.
package apistore
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"reflect"
"strconv"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
var _ storage.Interface = (*Storage)(nil)
// Storage implements storage.Interface and stores resources in unified storage
type Storage struct {
config *storagebackend.ConfigForResource
store resource.ResourceStoreClient
gr schema.GroupResource
codec runtime.Codec
keyFunc func(obj runtime.Object) (string, error)
newFunc func() runtime.Object
newListFunc func() runtime.Object
getAttrsFunc storage.AttrFunc
// trigger storage.IndexerFuncs
// indexers *cache.Indexers
}
func NewStorage(
config *storagebackend.ConfigForResource,
gr schema.GroupResource,
store resource.ResourceStoreClient,
codec runtime.Codec,
keyFunc func(obj runtime.Object) (string, error),
newFunc func() runtime.Object,
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
) (storage.Interface, factory.DestroyFunc, error) {
return &Storage{
config: config,
gr: gr,
codec: codec,
store: store,
keyFunc: keyFunc,
newFunc: newFunc,
newListFunc: newListFunc,
getAttrsFunc: getAttrsFunc,
}, nil, nil
}
func errorWrap(status *resource.StatusResult) error {
if status != nil {
return &apierrors.StatusError{ErrStatus: metav1.Status{
Status: metav1.StatusFailure,
Code: status.Code,
Reason: metav1.StatusReason(status.Reason),
Message: status.Message,
}}
}
return nil
}
func getKey(val string) (*resource.ResourceKey, error) {
k, err := grafanaregistry.ParseKey(val)
if err != nil {
return nil, err
}
// if k.Group == "" {
// return nil, apierrors.NewInternalError(fmt.Errorf("missing group in request"))
// }
if k.Resource == "" {
return nil, apierrors.NewInternalError(fmt.Errorf("missing resource in request"))
}
return &resource.ResourceKey{
Namespace: k.Namespace,
Group: k.Group,
Resource: k.Resource,
Name: k.Name,
}, err
}
// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
// set to the read value from database.
func (s *Storage) Create(ctx context.Context, key string, obj runtime.Object, out runtime.Object, ttl uint64) error {
k, err := getKey(key)
if err != nil {
return err
}
err = s.Versioner().PrepareObjectForStorage(obj)
if err != nil {
return err
}
var buf bytes.Buffer
err = s.codec.Encode(obj, &buf)
if err != nil {
return err
}
cmd := &resource.CreateRequest{
Key: k,
Value: buf.Bytes(),
}
// TODO?? blob from context?
rsp, err := s.store.Create(ctx, cmd)
if err != nil {
return err
}
err = errorWrap(rsp.Status)
if err != nil {
return err
}
if rsp.Status != nil {
return fmt.Errorf("error in status %+v", rsp.Status)
}
// Create into the out value
_, _, err = s.codec.Decode(rsp.Value, nil, out)
if err != nil {
return err
}
after, err := utils.MetaAccessor(out)
if err != nil {
return err
}
after.SetResourceVersionInt64(rsp.ResourceVersion)
return nil
}
// Delete removes the specified key and returns the value that existed at that spot.
// If key didn't exist, it will return NotFound storage error.
// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the
// current version of the object to avoid read operation from storage to get it.
// However, the implementations have to retry in case suggestion is stale.
func (s *Storage) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
k, err := getKey(key)
if err != nil {
return err
}
// if validateDeletion != nil {
// return fmt.Errorf("not supported (validate deletion)")
// }
cmd := &resource.DeleteRequest{Key: k}
if preconditions != nil {
if preconditions.ResourceVersion != nil {
cmd.ResourceVersion, err = strconv.ParseInt(*preconditions.ResourceVersion, 10, 64)
if err != nil {
return err
}
}
if preconditions.UID != nil {
cmd.Uid = string(*preconditions.UID)
}
}
rsp, err := s.store.Delete(ctx, cmd)
if err != nil {
return err
}
err = errorWrap(rsp.Status)
if err != nil {
return err
}
return nil
}
// Watch begins watching the specified key. Events are decoded into API objects,
// and any items selected by 'p' are sent down to returned watch.Interface.
// resourceVersion may be used to specify what version to begin watching,
// which should be the current resourceVersion, and no longer rv+1
// (e.g. reconnecting without missing any updates).
// If resource version is "0", this interface will get current object at given key
// and send it in an "ADDED" event, before watch starts.
func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
listopts, _, err := toListRequest(key, opts)
if err != nil {
return nil, err
}
if listopts == nil {
return watch.NewEmptyWatch(), nil
}
cmd := &resource.WatchRequest{
Since: listopts.ResourceVersion,
Options: listopts.Options,
SendInitialEvents: false,
AllowWatchBookmarks: opts.Predicate.AllowWatchBookmarks,
}
if opts.SendInitialEvents != nil {
cmd.SendInitialEvents = *opts.SendInitialEvents
}
client, err := s.store.Watch(ctx, cmd)
if err != nil {
// if the context was canceled, just return a new empty watch
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, io.EOF) {
return watch.NewEmptyWatch(), nil
}
return nil, err
}
reporter := apierrors.NewClientErrorReporter(500, "WATCH", "")
decoder := &streamDecoder{
client: client,
newFunc: s.newFunc,
opts: opts,
codec: s.codec,
}
return watch.NewStreamWatcher(decoder, reporter), nil
}
// Get decodes object found at key into objPtr. On a not found error, will either
// return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'.
// Treats empty responses and nil response nodes exactly like a not found error.
// The returned contents may be delayed, but it is guaranteed that they will
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
func (s *Storage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
var err error
req := &resource.ReadRequest{}
req.Key, err = getKey(key)
if err != nil {
return err
}
if opts.ResourceVersion != "" {
req.ResourceVersion, err = strconv.ParseInt(opts.ResourceVersion, 10, 64)
if err != nil {
return err
}
}
rsp, err := s.store.Read(ctx, req)
if err != nil {
return err
}
err = errorWrap(rsp.Status)
if err != nil {
return err
}
_, _, err = s.codec.Decode(rsp.Value, &schema.GroupVersionKind{}, objPtr)
if err != nil {
return err
}
obj, err := utils.MetaAccessor(objPtr)
if err != nil {
return err
}
obj.SetResourceVersionInt64(rsp.ResourceVersion)
return nil
}
func toListRequest(key string, opts storage.ListOptions) (*resource.ListRequest, storage.SelectionPredicate, error) {
predicate := opts.Predicate
k, err := getKey(key)
if err != nil {
return nil, predicate, err
}
req := &resource.ListRequest{
Limit: opts.Predicate.Limit,
Options: &resource.ListOptions{
Key: k,
},
NextPageToken: predicate.Continue,
}
if opts.Predicate.Label != nil && !opts.Predicate.Label.Empty() {
requirements, selectable := opts.Predicate.Label.Requirements()
if !selectable {
return nil, predicate, nil // not selectable
}
for _, r := range requirements {
v := r.Key()
req.Options.Labels = append(req.Options.Labels, &resource.Requirement{
Key: v,
Operator: string(r.Operator()),
Values: r.Values().List(),
})
}
}
if opts.Predicate.Field != nil && !opts.Predicate.Field.Empty() {
requirements := opts.Predicate.Field.Requirements()
for _, r := range requirements {
requirement := &resource.Requirement{Key: r.Field, Operator: string(r.Operator)}
if r.Value != "" {
requirement.Values = append(requirement.Values, r.Value)
}
req.Options.Labels = append(req.Options.Labels, requirement)
}
}
if opts.ResourceVersion != "" {
rv, err := strconv.ParseInt(opts.ResourceVersion, 10, 64)
if err != nil {
return nil, predicate, apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %s", opts.ResourceVersion))
}
req.ResourceVersion = rv
}
switch opts.ResourceVersionMatch {
case "", metav1.ResourceVersionMatchNotOlderThan:
req.VersionMatch = resource.ResourceVersionMatch_NotOlderThan
case metav1.ResourceVersionMatchExact:
req.VersionMatch = resource.ResourceVersionMatch_Exact
default:
return nil, predicate, apierrors.NewBadRequest(
fmt.Sprintf("unsupported version match: %v", opts.ResourceVersionMatch),
)
}
return req, predicate, nil
}
// GetList unmarshalls objects found at key into a *List api object (an object
// that satisfies runtime.IsList definition).
// If 'opts.Recursive' is false, 'key' is used as an exact match. If `opts.Recursive'
// is true, 'key' is used as a prefix.
// The returned contents may be delayed, but it is guaranteed that they will
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
func (s *Storage) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
req, predicate, err := toListRequest(key, opts)
if err != nil {
return err
}
rsp, err := s.store.List(ctx, req)
if err != nil {
return err
}
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
v, err := conversion.EnforcePtr(listPtr)
if err != nil {
return err
}
for _, item := range rsp.Items {
tmp := s.newFunc()
tmp, _, err = s.codec.Decode(item.Value, nil, tmp)
if err != nil {
return err
}
obj, err := utils.MetaAccessor(tmp)
if err != nil {
return err
}
obj.SetResourceVersionInt64(item.ResourceVersion)
// apply any predicates not handled in storage
matches, err := predicate.Matches(tmp)
if err != nil {
return apierrors.NewInternalError(err)
}
if !matches {
continue
}
v.Set(reflect.Append(v, reflect.ValueOf(tmp).Elem()))
}
listAccessor, err := meta.ListAccessor(listObj)
if err != nil {
return err
}
if rsp.NextPageToken != "" {
listAccessor.SetContinue(rsp.NextPageToken)
}
if rsp.RemainingItemCount > 0 {
listAccessor.SetRemainingItemCount(&rsp.RemainingItemCount)
}
if rsp.ResourceVersion > 0 {
listAccessor.SetResourceVersion(strconv.FormatInt(rsp.ResourceVersion, 10))
}
return nil
}
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'destination')
// retrying the update until success if there is index conflict.
// Note that object passed to tryUpdate may change across invocations of tryUpdate() if
// other writers are simultaneously updating it, so tryUpdate() needs to take into account
// the current contents of the object when deciding how the update object should look.
// If the key doesn't exist, it will return NotFound storage error if ignoreNotFound=false
// else `destination` will be set to the zero value of it's type.
// If the eventual successful invocation of `tryUpdate` returns an output with the same serialized
// contents as the input, it won't perform any update, but instead set `destination` to an object with those
// contents.
// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the
// current version of the object to avoid read operation from storage to get it.
// However, the implementations have to retry in case suggestion is stale.
func (s *Storage) GuaranteedUpdate(
ctx context.Context,
key string,
destination runtime.Object,
ignoreNotFound bool,
preconditions *storage.Preconditions,
tryUpdate storage.UpdateFunc,
cachedExistingObject runtime.Object,
) error {
k, err := getKey(key)
if err != nil {
return err
}
// Get the current version
err = s.Get(ctx, key, storage.GetOptions{}, destination)
if err != nil {
if ignoreNotFound && apierrors.IsNotFound(err) {
// destination is already set to zero value
// we'll create the resource
} else {
return err
}
}
accessor, err := utils.MetaAccessor(destination)
if err != nil {
return err
}
// Early optimistic locking failure
previousVersion, _ := strconv.ParseInt(accessor.GetResourceVersion(), 10, 64)
if preconditions != nil {
if preconditions.ResourceVersion != nil {
rv, err := strconv.ParseInt(*preconditions.ResourceVersion, 10, 64)
if err != nil {
return err
}
if rv != previousVersion {
return fmt.Errorf("optimistic locking mismatch (previousVersion mismatch)")
}
}
if preconditions.UID != nil {
if accessor.GetUID() != *preconditions.UID {
return fmt.Errorf("optimistic locking mismatch (UID mismatch)")
}
}
}
res := &storage.ResponseMeta{}
updatedObj, _, err := tryUpdate(destination, *res)
if err != nil {
var statusErr *apierrors.StatusError
if errors.As(err, &statusErr) {
// For now, forbidden may come from a mutation handler
if statusErr.ErrStatus.Reason == metav1.StatusReasonForbidden {
return statusErr
}
}
return apierrors.NewInternalError(
fmt.Errorf("could not successfully update object. key=%s, err=%s", k.String(), err.Error()),
)
}
var buf bytes.Buffer
err = s.codec.Encode(updatedObj, &buf)
if err != nil {
return err
}
req := &resource.UpdateRequest{Key: k, Value: buf.Bytes()}
rsp, err := s.store.Update(ctx, req)
if err != nil {
return err
}
err = errorWrap(rsp.Status)
if err != nil {
return err
}
// Read the mutated fields the response field
_, _, err = s.codec.Decode(rsp.Value, nil, destination)
if err != nil {
return err
}
accessor, err = utils.MetaAccessor(destination)
if err != nil {
return err
}
accessor.SetResourceVersionInt64(rsp.ResourceVersion)
return nil
}
// Count returns number of different entries under the key (generally being path prefix).
func (s *Storage) Count(key string) (int64, error) {
return 0, nil
}
func (s *Storage) Versioner() storage.Versioner {
return &storage.APIObjectVersioner{}
}
func (s *Storage) RequestWatchProgress(ctx context.Context) error {
return nil
}

View File

@ -0,0 +1,203 @@
package apistore
import (
"errors"
"fmt"
"io"
grpcCodes "google.golang.org/grpc/codes"
grpcStatus "google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/klog/v2"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
type streamDecoder struct {
client resource.ResourceStore_WatchClient
newFunc func() runtime.Object
opts storage.ListOptions
codec runtime.Codec
}
func (d *streamDecoder) toObject(w *resource.WatchEvent_Resource) (runtime.Object, error) {
obj, _, err := d.codec.Decode(w.Value, nil, d.newFunc())
if err == nil {
accessor, err := utils.MetaAccessor(obj)
if err != nil {
return nil, err
}
accessor.SetResourceVersionInt64(w.Version)
}
return obj, err
}
func (d *streamDecoder) Decode() (action watch.EventType, object runtime.Object, err error) {
decode:
for {
err := d.client.Context().Err()
if err != nil {
klog.Errorf("client: context error: %s\n", err)
return watch.Error, nil, err
}
evt, err := d.client.Recv()
if errors.Is(err, io.EOF) {
return watch.Error, nil, err
}
if grpcStatus.Code(err) == grpcCodes.Canceled {
return watch.Error, nil, err
}
if err != nil {
klog.Errorf("client: error receiving result: %s", err)
return watch.Error, nil, err
}
// Error event
if evt.Type == resource.WatchEvent_ERROR {
err = fmt.Errorf("stream error")
klog.Errorf("client: error receiving result: %s", err)
return watch.Error, nil, err
}
if evt.Resource == nil {
klog.Errorf("client: received nil \n")
continue decode
}
if evt.Type == resource.WatchEvent_BOOKMARK {
obj := d.newFunc()
// here k8s expects an empty object with just resource version and k8s.io/initial-events-end annotation
accessor, err := utils.MetaAccessor(obj)
if err != nil {
klog.Errorf("error getting object accessor: %s", err)
return watch.Error, nil, err
}
accessor.SetResourceVersionInt64(evt.Resource.Version)
accessor.SetAnnotations(map[string]string{"k8s.io/initial-events-end": "true"})
return watch.Bookmark, obj, nil
}
obj, err := d.toObject(evt.Resource)
if err != nil {
klog.Errorf("error decoding entity: %s", err)
return watch.Error, nil, err
}
var watchAction watch.EventType
switch evt.Type {
case resource.WatchEvent_ADDED:
// apply any predicates not handled in storage
matches, err := d.opts.Predicate.Matches(obj)
if err != nil {
klog.Errorf("error matching object: %s", err)
return watch.Error, nil, err
}
if !matches {
continue decode
}
watchAction = watch.Added
case resource.WatchEvent_MODIFIED:
watchAction = watch.Modified
// apply any predicates not handled in storage
matches, err := d.opts.Predicate.Matches(obj)
if err != nil {
klog.Errorf("error matching object: %s", err)
return watch.Error, nil, err
}
// if we have a previous object, check if it matches
prevMatches := false
var prevObj runtime.Object
if evt.Previous != nil {
prevObj, err = d.toObject(evt.Previous)
if err != nil {
klog.Errorf("error decoding entity: %s", err)
return watch.Error, nil, err
}
// apply any predicates not handled in storage
prevMatches, err = d.opts.Predicate.Matches(prevObj)
if err != nil {
klog.Errorf("error matching object: %s", err)
return watch.Error, nil, err
}
}
if !matches {
if !prevMatches {
continue decode
}
// if the object didn't match, send a Deleted event
watchAction = watch.Deleted
// here k8s expects the previous object but with the new resource version
obj = prevObj
accessor, err := utils.MetaAccessor(obj)
if err != nil {
klog.Errorf("error getting object accessor: %s", err)
return watch.Error, nil, err
}
accessor.SetResourceVersionInt64(evt.Resource.Version)
} else if !prevMatches {
// if the object didn't previously match, send an Added event
watchAction = watch.Added
}
case resource.WatchEvent_DELETED:
watchAction = watch.Deleted
// if we have a previous object, return that in the deleted event
if evt.Previous != nil {
obj, err = d.toObject(evt.Previous)
if err != nil {
klog.Errorf("error decoding entity: %s", err)
return watch.Error, nil, err
}
// here k8s expects the previous object but with the new resource version
accessor, err := utils.MetaAccessor(obj)
if err != nil {
klog.Errorf("error getting object accessor: %s", err)
return watch.Error, nil, err
}
accessor.SetResourceVersionInt64(evt.Resource.Version)
}
// apply any predicates not handled in storage
matches, err := d.opts.Predicate.Matches(obj)
if err != nil {
klog.Errorf("error matching object: %s", err)
return watch.Error, nil, err
}
if !matches {
continue decode
}
default:
watchAction = watch.Error
}
return watchAction, obj, nil
}
}
func (d *streamDecoder) Close() {
err := d.client.CloseSend()
if err != nil {
klog.Errorf("error closing watch stream: %s", err)
}
}
var _ watch.Decoder = (*streamDecoder)(nil)

View File

@ -0,0 +1,119 @@
package entitybridge
import (
"errors"
"io"
"time"
grpcCodes "google.golang.org/grpc/codes"
grpcStatus "google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog/v2"
"github.com/grafana/grafana/pkg/apimachinery/utils"
entitystore "github.com/grafana/grafana/pkg/services/apiserver/storage/entity"
"github.com/grafana/grafana/pkg/services/store/entity"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
type decoder struct {
client entity.EntityStore_WatchClient
}
// Any errors will end the stream
func (d *decoder) next() (*resource.WrittenEvent, error) {
decode:
for {
err := d.client.Context().Err()
if err != nil {
klog.Errorf("client: context error: %s\n", err)
return nil, err
}
rsp, err := d.client.Recv()
if errors.Is(err, io.EOF) {
return nil, err
}
if grpcStatus.Code(err) == grpcCodes.Canceled {
return nil, err
}
if err != nil {
klog.Errorf("client: error receiving result: %s", err)
return nil, err
}
if rsp.Entity == nil {
klog.Errorf("client: received nil entity\n")
continue decode
}
event := resource.WriteEvent{
Key: &resource.ResourceKey{
Group: rsp.Entity.Namespace,
Resource: rsp.Entity.Resource,
Namespace: rsp.Entity.Namespace,
Name: rsp.Entity.Name,
},
}
switch rsp.Entity.Action {
case entity.Entity_CREATED:
event.Type = resource.WatchEvent_ADDED
case entity.Entity_UPDATED:
event.Type = resource.WatchEvent_MODIFIED
case entity.Entity_DELETED:
event.Type = resource.WatchEvent_DELETED
default:
klog.Errorf("unsupported action\n")
continue decode
}
// Now decode the bytes into an object
obj := &unstructured.Unstructured{}
err = entitystore.EntityToRuntimeObject(rsp.Entity, obj, unstructured.UnstructuredJSONScheme)
if err != nil {
klog.Errorf("error decoding entity: %s", err)
return nil, err
}
event.Value, err = obj.MarshalJSON()
if err != nil {
return nil, err
}
event.Object, err = utils.MetaAccessor(obj)
if err != nil {
return nil, err
}
// Decode the old value
if rsp.Previous != nil {
err = entitystore.EntityToRuntimeObject(rsp.Previous, obj, unstructured.UnstructuredJSONScheme)
if err != nil {
klog.Errorf("error decoding entity: %s", err)
return nil, err
}
event.ObjectOld, err = utils.MetaAccessor(obj)
if err != nil {
return nil, err
}
event.PreviousRV, err = event.ObjectOld.GetResourceVersionInt64()
if err != nil {
return nil, err
}
}
return &resource.WrittenEvent{
ResourceVersion: rsp.Entity.ResourceVersion,
Timestamp: time.Now().UnixMilli(),
WriteEvent: event,
}, nil
}
}
func (d *decoder) close() {
err := d.client.CloseSend()
if err != nil {
klog.Errorf("error closing watch stream: %s", err)
}
}

View File

@ -0,0 +1,5 @@
// Package entitybridge implements an ResourceServer using existing EntityAPI contracts
//
// This package will be removed and replaced with a more streamlined SQL implementation
// that leverages what we have learned from the entity deployments so far
package entitybridge

View File

@ -0,0 +1,237 @@
package entitybridge
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/klog/v2"
grafanaregistry "github.com/grafana/grafana/pkg/apiserver/registry/generic"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/store/entity"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
// Creates a ResourceServer using the existing entity tables
// NOTE: the server is optional and only used to pass init+close functions
func EntityAsResourceServer(client entity.EntityStoreClient, server sqlstash.SqlEntityServer, tracer tracing.Tracer) (resource.ResourceServer, error) {
if client == nil {
return nil, fmt.Errorf("client must be defined")
}
// Use this bridge as the resource store
bridge := &entityBridge{
client: client,
server: server,
}
return resource.NewResourceServer(resource.ResourceServerOptions{
Tracer: tracer,
Backend: bridge,
Diagnostics: bridge,
Lifecycle: bridge,
})
}
// This is only created if we use the entity implementation
type entityBridge struct {
client entity.EntityStoreClient
// When running directly
// (we need the explicit version so we have access to init+stop)
server sqlstash.SqlEntityServer
}
// Init implements ResourceServer.
func (b *entityBridge) Init() error {
if b.server != nil {
return b.server.Init()
}
return nil
}
// Stop implements ResourceServer.
func (b *entityBridge) Stop() {
if b.server != nil {
b.server.Stop()
}
}
// Convert resource key to the entity key
func toEntityKey(key *resource.ResourceKey) string {
e := grafanaregistry.Key{
Group: key.Group,
Resource: key.Resource,
Namespace: key.Namespace,
Name: key.Name,
}
return e.String()
}
func (b *entityBridge) WriteEvent(ctx context.Context, event resource.WriteEvent) (int64, error) {
key := toEntityKey(event.Key)
// Delete does not need to create an entity first
if event.Type == resource.WatchEvent_DELETED {
rsp, err := b.client.Delete(ctx, &entity.DeleteEntityRequest{
Key: key,
PreviousVersion: event.PreviousRV,
})
if err != nil {
return 0, err
}
return rsp.Entity.ResourceVersion, err
}
gvr := event.Object.GetGroupVersionKind()
obj := event.Object
msg := &entity.Entity{
Key: key,
Group: event.Key.Group,
Resource: event.Key.Resource,
Namespace: event.Key.Namespace,
Name: event.Key.Name,
Guid: string(event.Object.GetUID()),
GroupVersion: gvr.Version,
Folder: obj.GetFolder(),
Body: event.Value,
Message: event.Object.GetMessage(),
Labels: obj.GetLabels(),
Size: int64(len(event.Value)),
}
switch event.Type {
case resource.WatchEvent_ADDED:
msg.Action = entity.Entity_CREATED
rsp, err := b.client.Create(ctx, &entity.CreateEntityRequest{Entity: msg})
if err != nil {
return 0, err
}
return rsp.Entity.ResourceVersion, err
case resource.WatchEvent_MODIFIED:
msg.Action = entity.Entity_UPDATED
rsp, err := b.client.Update(ctx, &entity.UpdateEntityRequest{
Entity: msg,
PreviousVersion: event.PreviousRV,
})
if err != nil {
return 0, err
}
return rsp.Entity.ResourceVersion, err
default:
}
return 0, fmt.Errorf("unsupported operation: %s", event.Type.String())
}
func (b *entityBridge) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) {
client, err := b.client.Watch(ctx)
if err != nil {
return nil, err
}
req := &entity.EntityWatchRequest{
Action: entity.EntityWatchRequest_START,
Labels: map[string]string{},
WithBody: true,
WithStatus: true,
SendInitialEvents: false,
}
err = client.Send(req)
if err != nil {
err2 := client.CloseSend()
if err2 != nil {
klog.Errorf("watch close failed: %s\n", err2)
}
return nil, err
}
reader := &decoder{client}
stream := make(chan *resource.WrittenEvent, 10)
go func() {
for {
evt, err := reader.next()
if err != nil {
reader.close()
close(stream)
return
}
stream <- evt
}
}()
return stream, nil
}
// IsHealthy implements ResourceServer.
func (b *entityBridge) IsHealthy(ctx context.Context, req *resource.HealthCheckRequest) (*resource.HealthCheckResponse, error) {
rsp, err := b.client.IsHealthy(ctx, &entity.HealthCheckRequest{
Service: req.Service, // ??
})
if err != nil {
return nil, err
}
return &resource.HealthCheckResponse{
Status: resource.HealthCheckResponse_ServingStatus(rsp.Status),
}, nil
}
// Read implements ResourceServer.
func (b *entityBridge) Read(ctx context.Context, req *resource.ReadRequest) (*resource.ReadResponse, error) {
v, err := b.client.Read(ctx, &entity.ReadEntityRequest{
Key: toEntityKey(req.Key),
WithBody: true,
})
if err != nil {
return nil, err
}
return &resource.ReadResponse{
ResourceVersion: v.ResourceVersion,
Value: v.Body,
}, nil
}
// List implements ResourceServer.
func (b *entityBridge) PrepareList(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) {
key := req.Options.Key
query := &entity.EntityListRequest{
NextPageToken: req.NextPageToken,
Limit: req.Limit,
Key: []string{toEntityKey(key)},
WithBody: true,
}
if len(req.Options.Labels) > 0 {
query.Labels = make(map[string]string)
for _, q := range req.Options.Labels {
// The entity structure only supports equals
// the rest will be processed handled by the upstream predicate
op := selection.Operator(q.Operator)
if op == selection.Equals || op == selection.DoubleEquals {
query.Labels[q.Key] = q.Values[0]
}
}
}
found, err := b.client.List(ctx, query)
if err != nil {
return nil, err
}
rsp := &resource.ListResponse{
ResourceVersion: found.ResourceVersion,
NextPageToken: found.NextPageToken,
}
for _, item := range found.Results {
rsp.Items = append(rsp.Items, &resource.ResourceWrapper{
ResourceVersion: item.ResourceVersion,
Value: item.Body,
})
}
return rsp, nil
}

View File

@ -0,0 +1,10 @@
version: v1
plugins:
- plugin: go
out: pkg/storage/unified/resource
opt: paths=source_relative
- plugin: go-grpc
out: pkg/storage/unified/resource
opt:
- paths=source_relative
- require_unimplemented_servers=false

View File

@ -0,0 +1,7 @@
version: v1
breaking:
use:
- FILE
lint:
use:
- DEFAULT

View File

@ -0,0 +1,285 @@
package resource
import (
"bytes"
context "context"
"fmt"
"io"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"gocloud.dev/blob"
_ "gocloud.dev/blob/fileblob"
_ "gocloud.dev/blob/memblob"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
)
type CDKBackendOptions struct {
Tracer trace.Tracer
Bucket *blob.Bucket
RootFolder string
}
func NewCDKBackend(ctx context.Context, opts CDKBackendOptions) (StorageBackend, error) {
if opts.Tracer == nil {
opts.Tracer = noop.NewTracerProvider().Tracer("cdk-appending-store")
}
if opts.Bucket == nil {
return nil, fmt.Errorf("missing bucket")
}
found, _, err := opts.Bucket.ListPage(ctx, blob.FirstPageToken, 1, &blob.ListOptions{
Prefix: opts.RootFolder,
Delimiter: "/",
})
if err != nil {
return nil, err
}
if found == nil {
return nil, fmt.Errorf("the root folder does not exist")
}
backend := &cdkBackend{
tracer: opts.Tracer,
bucket: opts.Bucket,
root: opts.RootFolder,
}
backend.rv.Swap(time.Now().UnixMilli())
return backend, nil
}
type cdkBackend struct {
tracer trace.Tracer
bucket *blob.Bucket
root string
mutex sync.Mutex
rv atomic.Int64
// Simple watch stream -- NOTE, this only works for single tenant!
broadcaster Broadcaster[*WrittenEvent]
stream chan<- *WrittenEvent
}
func (s *cdkBackend) getPath(key *ResourceKey, rv int64) string {
var buffer bytes.Buffer
buffer.WriteString(s.root)
if key.Group == "" {
return buffer.String()
}
buffer.WriteString(key.Group)
if key.Resource == "" {
return buffer.String()
}
buffer.WriteString("/")
buffer.WriteString(key.Resource)
if key.Namespace == "" {
if key.Name == "" {
return buffer.String()
}
buffer.WriteString("/__cluster__")
} else {
buffer.WriteString("/")
buffer.WriteString(key.Namespace)
}
if key.Name == "" {
return buffer.String()
}
buffer.WriteString("/")
buffer.WriteString(key.Name)
if rv > 0 {
buffer.WriteString(fmt.Sprintf("/%d.json", rv))
}
return buffer.String()
}
func (s *cdkBackend) WriteEvent(ctx context.Context, event WriteEvent) (rv int64, err error) {
// Scope the lock
{
s.mutex.Lock()
defer s.mutex.Unlock()
rv = s.rv.Add(1)
err = s.bucket.WriteAll(ctx, s.getPath(event.Key, rv), event.Value, &blob.WriterOptions{
ContentType: "application/json",
})
}
// Async notify all subscribers
if s.stream != nil {
go func() {
write := &WrittenEvent{
WriteEvent: event,
Timestamp: time.Now().UnixMilli(),
ResourceVersion: rv,
}
s.stream <- write
}()
}
return rv, err
}
func (s *cdkBackend) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) {
rv := req.ResourceVersion
path := s.getPath(req.Key, req.ResourceVersion)
if rv < 1 {
iter := s.bucket.List(&blob.ListOptions{Prefix: path + "/", Delimiter: "/"})
for {
obj, err := iter.Next(ctx)
if err == io.EOF {
break
}
if strings.HasSuffix(obj.Key, ".json") {
idx := strings.LastIndex(obj.Key, "/") + 1
edx := strings.LastIndex(obj.Key, ".")
if idx > 0 {
v, err := strconv.ParseInt(obj.Key[idx:edx], 10, 64)
if err == nil && v > rv {
rv = v
path = obj.Key // find the path with biggest resource version
}
}
}
}
}
raw, err := s.bucket.ReadAll(ctx, path)
if raw == nil || (err == nil && isDeletedMarker(raw)) {
return nil, apierrors.NewNotFound(schema.GroupResource{
Group: req.Key.Group,
Resource: req.Key.Resource,
}, req.Key.Name)
}
return &ReadResponse{
ResourceVersion: rv,
Value: raw,
}, err
}
func isDeletedMarker(raw []byte) bool {
if bytes.Contains(raw, []byte(`"DeletedMarker"`)) {
tmp := &unstructured.Unstructured{}
err := tmp.UnmarshalJSON(raw)
if err == nil && tmp.GetKind() == "DeletedMarker" {
return true
}
}
return false
}
func (s *cdkBackend) PrepareList(ctx context.Context, req *ListRequest) (*ListResponse, error) {
resources, err := buildTree(ctx, s, req.Options.Key)
if err != nil {
return nil, err
}
rsp := &ListResponse{
ResourceVersion: s.rv.Load(),
}
for _, item := range resources {
latest := item.versions[0]
raw, err := s.bucket.ReadAll(ctx, latest.key)
if err != nil {
return nil, err
}
if !isDeletedMarker(raw) {
rsp.Items = append(rsp.Items, &ResourceWrapper{
ResourceVersion: latest.rv,
Value: raw,
})
}
}
return rsp, nil
}
func (s *cdkBackend) WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.broadcaster == nil {
var err error
s.broadcaster, err = NewBroadcaster(context.Background(), func(c chan<- *WrittenEvent) error {
s.stream = c
return nil
})
if err != nil {
return nil, err
}
}
return s.broadcaster.Subscribe(ctx)
}
// group > resource > namespace > name > versions
type cdkResource struct {
prefix string
versions []cdkVersion
}
type cdkVersion struct {
rv int64
key string
}
func buildTree(ctx context.Context, s *cdkBackend, key *ResourceKey) ([]cdkResource, error) {
byPrefix := make(map[string]*cdkResource)
path := s.getPath(key, 0)
iter := s.bucket.List(&blob.ListOptions{Prefix: path, Delimiter: ""}) // "" is recursive
for {
obj, err := iter.Next(ctx)
if err == io.EOF {
break
}
if strings.HasSuffix(obj.Key, ".json") {
idx := strings.LastIndex(obj.Key, "/") + 1
edx := strings.LastIndex(obj.Key, ".")
if idx > 0 {
rv, err := strconv.ParseInt(obj.Key[idx:edx], 10, 64)
if err == nil {
prefix := obj.Key[:idx]
res, ok := byPrefix[prefix]
if !ok {
res = &cdkResource{prefix: prefix}
byPrefix[prefix] = res
}
res.versions = append(res.versions, cdkVersion{
rv: rv,
key: obj.Key,
})
}
}
}
}
// Now sort all versions
resources := make([]cdkResource, 0, len(byPrefix))
for _, res := range byPrefix {
sort.Slice(res.versions, func(i, j int) bool {
return res.versions[i].rv > res.versions[j].rv
})
resources = append(resources, *res)
}
sort.Slice(resources, func(i, j int) bool {
a := resources[i].versions[0].rv
b := resources[j].versions[0].rv
return a > b
})
return resources, nil
}

View File

@ -0,0 +1,30 @@
package resource
import (
"github.com/fullstorydev/grpchan"
"github.com/fullstorydev/grpchan/inprocgrpc"
grpcAuth "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth"
"google.golang.org/grpc"
grpcUtils "github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
)
func NewLocalResourceStoreClient(server ResourceStoreServer) ResourceStoreClient {
channel := &inprocgrpc.Channel{}
auth := &grpcUtils.Authenticator{}
channel.RegisterService(
grpchan.InterceptServer(
&ResourceStore_ServiceDesc,
grpcAuth.UnaryServerInterceptor(auth.Authenticate),
grpcAuth.StreamServerInterceptor(auth.Authenticate),
),
server,
)
return NewResourceStoreClient(grpchan.InterceptClientConn(channel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor))
}
func NewResourceStoreClientGRPC(channel *grpc.ClientConn) ResourceStoreClient {
return NewResourceStoreClient(grpchan.InterceptClientConn(channel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor))
}

View File

@ -0,0 +1,37 @@
package resource
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
// This object is written when an object is deleted
type DeletedMarker struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DeletedMarker) DeepCopyInto(out *DeletedMarker) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeletedMarker.
func (in *DeletedMarker) DeepCopy() *DeletedMarker {
if in == nil {
return nil
}
out := new(DeletedMarker)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *DeletedMarker) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}

View File

@ -0,0 +1,2 @@
// Package resource creates a ResourceServer that handles generic storage operations
package resource

View File

@ -0,0 +1,92 @@
package resource
import (
context "context"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apimachinery/utils"
)
type WriteEvent struct {
Type WatchEvent_Type // ADDED, MODIFIED, DELETED
Key *ResourceKey // the request key
PreviousRV int64 // only for Update+Delete
// The json payload (without resourceVersion)
Value []byte
// Access real fields
Object utils.GrafanaMetaAccessor
// Access to the old metadata
ObjectOld utils.GrafanaMetaAccessor
}
// WriteEvents after they include a resource version
type WrittenEvent struct {
WriteEvent
// The resource version
ResourceVersion int64
// Timestamp when the event is created
Timestamp int64
}
// A function to write events
type EventAppender = func(context.Context, *WriteEvent) (int64, error)
type writeEventBuilder struct {
EventID int64
Key *ResourceKey // the request key
Type WatchEvent_Type
Requester identity.Requester
Object *unstructured.Unstructured
// Access the raw metadata values
Meta utils.GrafanaMetaAccessor
OldMeta utils.GrafanaMetaAccessor
}
func newEventFromBytes(value, oldValue []byte) (*writeEventBuilder, error) {
builder := &writeEventBuilder{
Object: &unstructured.Unstructured{},
}
err := builder.Object.UnmarshalJSON(value)
if err != nil {
return nil, err
}
builder.Meta, err = utils.MetaAccessor(builder.Object)
if err != nil {
return nil, err
}
if oldValue == nil {
builder.Type = WatchEvent_ADDED
} else {
builder.Type = WatchEvent_MODIFIED
temp := &unstructured.Unstructured{}
err = temp.UnmarshalJSON(oldValue)
if err != nil {
return nil, err
}
builder.OldMeta, err = utils.MetaAccessor(temp)
if err != nil {
return nil, err
}
}
return builder, nil
}
func (b *writeEventBuilder) toEvent() (event WriteEvent, err error) {
event.Key = b.Key
event.Type = b.Type
event.ObjectOld = b.OldMeta
event.Object = b.Meta
event.Value, err = b.Object.MarshalJSON()
return // includes the named values
}

View File

@ -3,24 +3,63 @@ module github.com/grafana/grafana/pkg/storage/unified/resource
go 1.21.10
require (
github.com/fullstorydev/grpchan v1.1.1
github.com/google/uuid v1.6.0
github.com/grafana/authlib v0.0.0-20240611075137-331cbe4e840f
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0
github.com/prometheus/client_golang v1.19.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/otel/trace v1.26.0
gocloud.dev v0.25.0
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.1
k8s.io/apimachinery v0.29.3
)
require (
cloud.google.com/go v0.112.1 // indirect
cloud.google.com/go/storage v1.38.0 // indirect
github.com/aws/aws-sdk-go v1.51.31 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bufbuild/protocompile v0.4.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/go-jose/go-jose/v3 v3.0.3 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/googleapis/gax-go/v2 v2.12.3 // indirect
github.com/jhump/protoreflect v1.15.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.53.0 // indirect
github.com/prometheus/procfs v0.14.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.51.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 // indirect
go.opentelemetry.io/otel v1.26.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/oauth2 v0.20.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/api v0.176.0 // indirect
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)

View File

@ -1,20 +1,95 @@
cloud.google.com/go v0.112.1 h1:uJSeirPke5UNZHIb4SxfZklVSiWWVqW4oXlETwZziwM=
cloud.google.com/go/auth v0.2.2 h1:gmxNJs4YZYcw6YvKRtVBaF2fyUE6UrWPyzU8jHvYfmI=
cloud.google.com/go/auth/oauth2adapt v0.2.1 h1:VSPmMmUlT8CkIZ2PzD9AlLN+R3+D1clXMWHHa6vG/Ag=
cloud.google.com/go/compute v1.25.1 h1:ZRpHJedLtTpKgr3RV1Fx23NuaAEN1Zfx9hw1u4aJdjU=
cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc=
cloud.google.com/go/iam v1.1.6 h1:bEa06k05IO4f4uJonbB5iAgKTPpABy1ayxaIZV/GHVc=
cloud.google.com/go/storage v1.38.0 h1:Az68ZRGlnNTpIBbLjSMIV2BDcwwXYlRlQzis0llkpJg=
github.com/aws/aws-sdk-go v1.51.31 h1:4TM+sNc+Dzs7wY1sJ0+J8i60c6rkgnKP1pvPx8ghsSY=
github.com/aws/aws-sdk-go-v2 v1.16.2 h1:fqlCk6Iy3bnCumtrLz9r3mJ/2gUT0pJ0wLFVIdWh+JA=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1 h1:SdK4Ppk5IzLs64ZMvr6MrSficMtjY2oS0WOORXTlxwU=
github.com/aws/aws-sdk-go-v2/config v1.15.3 h1:5AlQD0jhVXlGzwo+VORKiUuogkG7pQcLJNzIzK7eodw=
github.com/aws/aws-sdk-go-v2/credentials v1.11.2 h1:RQQ5fzclAKJyY5TvF+fkjJEwzK4hnxQCLOu5JXzDmQo=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.3 h1:LWPg5zjHV9oz/myQr4wMs0gi4CjnDN/ILmyZUFYXZsU=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.3 h1:ir7iEq78s4txFGgwcLqD6q9IIPzTQNRJXulJd9h/zQo=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.9 h1:onz/VaaxZ7Z4V+WIN9Txly9XLTmoOh1oJ8XcAC3pako=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.3 h1:9stUQR/u2KXU6HkFJYlqnZEjBnbgrVbG6I5HN09xZh0=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.10 h1:by9P+oy3P/CwggN4ClnW2D4oL91QV7pBzBICi1chZvQ=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.1 h1:T4pFel53bkHjL2mMo+4DKE6r6AuoZnM0fg7k1/ratr4=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.3 h1:I0dcwWitE752hVSMrsLCxqNQ+UdEp3nACx2bYNMQq+k=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.3 h1:Gh1Gpyh01Yvn7ilO/b/hr01WgNpaszfbKMUgqM186xQ=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.3 h1:BKjwCJPnANbkwQ8vzSbaZDKawwagDubrH/z/c0X+kbQ=
github.com/aws/aws-sdk-go-v2/service/s3 v1.26.3 h1:rMPtwA7zzkSQZhhz9U3/SoIDz/NZ7Q+iRn4EIO8rSyU=
github.com/aws/aws-sdk-go-v2/service/sso v1.11.3 h1:frW4ikGcxfAEDfmQqWgMLp+F1n4nRo9sF39OcIb5BkQ=
github.com/aws/aws-sdk-go-v2/service/sts v1.16.3 h1:cJGRyzCSVwZC7zZZ1xbx9m32UnrKydRYhOvcD1NYP9Q=
github.com/aws/smithy-go v1.11.2 h1:eG/N+CcUMAvsdffgMvjMKwfyDzIkjM6pfxMJ8Mzc6mE=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZlaQsNA=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/fullstorydev/grpchan v1.1.1 h1:heQqIJlAv5Cnks9a70GRL2EJke6QQoUB25VGR6TZQas=
github.com/go-jose/go-jose/v3 v3.0.3 h1:fFKWeig/irsp7XD2zBxvnmA/XaRWp5V3CBsZXJF7G7k=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/wire v0.5.0 h1:I7ELFeVBr3yfPIcc8+MWvrjk+3VjbcSzoXm3JVa+jD8=
github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs=
github.com/googleapis/gax-go/v2 v2.12.3 h1:5/zPPDvw8Q1SuXjrqrZslrqT7dL/uJT2CQii/cLCKqA=
github.com/grafana/authlib v0.0.0-20240611075137-331cbe4e840f h1:hvRCAv+TgcHu3i/Sd7lFJx84iEtgzDCYuk7OWeXatD0=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 h1:pRhl55Yx1eC7BZ1N+BBWwnKaMyD8uC+34TLdndZMAKk=
github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
github.com/prometheus/common v0.53.0 h1:U2pL9w9nmJwJDa4qqLQ3ZaePJ6ZTwt7cMD3AG3+aLCE=
github.com/prometheus/procfs v0.14.0 h1:Lw4VdGGoKEZilJsayHf0B+9YgLGREba2C6xr+Fdfq6s=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.51.0 h1:A3SayB3rNyt+1S6qpI9mHPkeHTZbD7XILEqWnYZb2l0=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 h1:Xs2Ncz0gNihqu9iosIZ5SkBbWo5T8JhhLJFMQL1qmLI=
go.opentelemetry.io/otel v1.26.0 h1:LQwgL5s/1W7YiiRwxf03QGnWLb2HW4pLiAhaA5cZXBs=
go.opentelemetry.io/otel/metric v1.26.0 h1:7S39CLuY5Jgg9CrnA9HHiEjGMF/X2VHvoXGgSllRz30=
go.opentelemetry.io/otel/trace v1.26.0 h1:1ieeAUb4y0TE26jUFrCIXKpTuVK7uJGN9/Z/2LP5sQA=
gocloud.dev v0.25.0 h1:Y7vDq8xj7SyM848KXf32Krda2e6jQ4CLh/mTeCSqXtk=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/oauth2 v0.20.0 h1:4mQdhULixXKP1rwYBW0vAijoXnkTG0BLCDRzfe1idMo=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU=
google.golang.org/api v0.176.0 h1:dHj1/yv5Dm/eQTXiP9hNCRT3xzJHWXeNdRq29XbMxoE=
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de h1:F6qOa9AZTYJXOUEr4jDysRDLrm4PHePlge4v4TGAlxY=
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 h1:+rdxYoE3E5htTEWIe15GlN6IfvbURM//Jt0mmkmm6ZU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 h1:1GBuWVLM/KMVUv1t1En5Gs+gFZCNd360GGb4sSxtrhU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 h1:BwIjyKYGsK9dMCBOorzRri8MQwmi7mT9rGHsCEinZkA=
google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
k8s.io/apimachinery v0.29.3 h1:2tbx+5L7RNvqJjn7RIuIKu9XTsIZ9Z5wX2G22XAa5EU=
k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw=
k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo=
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=

View File

@ -39,14 +39,14 @@ func (f *Authenticator) Authenticate(ctx context.Context) (context.Context, erro
if !ok {
return nil, fmt.Errorf("no metadata found")
}
user, err := f.DecodeMetadata(ctx, md)
user, err := f.decodeMetadata(ctx, md)
if err != nil {
return nil, err
}
return identity.WithRequester(ctx, user), nil
}
func (f *Authenticator) DecodeMetadata(ctx context.Context, meta metadata.MD) (identity.Requester, error) {
func (f *Authenticator) decodeMetadata(ctx context.Context, meta metadata.MD) (identity.Requester, error) {
// Avoid NPE/panic with getting keys
getter := func(key string) string {
v := meta.Get(key)

View File

@ -23,7 +23,7 @@ func TestBasicEncodeDecode(t *testing.T) {
auth := &Authenticator{}
md := encodeIdentityInMetadata(before)
after, err := auth.DecodeMetadata(context.Background(), md)
after, err := auth.decodeMetadata(context.Background(), md)
require.NoError(t, err)
require.Equal(t, before.GetID(), after.GetID())
require.Equal(t, before.GetUID(), after.GetUID())

View File

@ -0,0 +1,45 @@
package resource
import (
context "context"
"fmt"
"github.com/grafana/grafana/pkg/apimachinery/identity"
)
type WriteAccessHooks struct {
// Check if a user has access to write folders
// When this is nil, no resources can have folders configured
Folder func(ctx context.Context, user identity.Requester, uid string) bool
// When configured, this will make sure a user is allowed to save to a given origin
Origin func(ctx context.Context, user identity.Requester, origin string) bool
}
type LifecycleHooks interface {
// Called once at initialization
Init() error
// Stop function -- after calling this, any additional storage functions may error
Stop()
}
func (a *WriteAccessHooks) CanWriteFolder(ctx context.Context, user identity.Requester, uid string) error {
if a.Folder == nil {
return fmt.Errorf("writing folders is not supported")
}
if !a.Folder(ctx, user, uid) {
return fmt.Errorf("not allowed to write resource to folder")
}
return nil
}
func (a *WriteAccessHooks) CanWriteOrigin(ctx context.Context, user identity.Requester, uid string) error {
if a.Origin == nil || uid == "UI" {
return nil // default to OK
}
if !a.Origin(ctx, user, uid) {
return fmt.Errorf("not allowed to write resource at origin")
}
return nil
}

View File

@ -0,0 +1,17 @@
package resource
func matchesQueryKey(query *ResourceKey, key *ResourceKey) bool {
if query.Group != key.Group {
return false
}
if query.Resource != key.Resource {
return false
}
if query.Namespace != "" && query.Namespace != key.Namespace {
return false
}
if query.Name != "" && query.Name != key.Name {
return false
}
return true
}

View File

@ -0,0 +1,21 @@
package resource
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestKeyMatching(t *testing.T) {
t.Run("key matching", func(t *testing.T) {
require.True(t, matchesQueryKey(&ResourceKey{
Group: "ggg",
Resource: "rrr",
Namespace: "ns",
}, &ResourceKey{
Group: "ggg",
Resource: "rrr",
Namespace: "ns",
}))
})
}

View File

@ -0,0 +1,41 @@
package resource
import (
"sync"
"github.com/prometheus/client_golang/prometheus"
)
var (
once sync.Once
StorageServerMetrics *StorageApiMetrics
)
type StorageApiMetrics struct {
OptimisticLockFailed *prometheus.CounterVec
}
func NewStorageMetrics() *StorageApiMetrics {
once.Do(func() {
StorageServerMetrics = &StorageApiMetrics{
OptimisticLockFailed: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "resource_storage",
Name: "optimistic_lock_failed",
Help: "count of optimistic locks failed",
},
[]string{"action"},
),
}
})
return StorageServerMetrics
}
func (s *StorageApiMetrics) Collect(ch chan<- prometheus.Metric) {
s.OptimisticLockFailed.Collect(ch)
}
func (s *StorageApiMetrics) Describe(ch chan<- *prometheus.Desc) {
s.OptimisticLockFailed.Describe(ch)
}

View File

@ -0,0 +1,41 @@
package resource
import (
"context"
)
var (
_ DiagnosticsServer = &noopService{}
_ LifecycleHooks = &noopService{}
)
// noopService is a helper implementation to simplify tests
// It does nothing except return errors when asked to do anything real
type noopService struct{}
// Init implements ResourceServer.
func (n *noopService) Init() error {
return nil
}
// Stop implements ResourceServer.
func (n *noopService) Stop() {
// nothing
}
// IsHealthy implements ResourceServer.
func (n *noopService) IsHealthy(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) {
return &HealthCheckResponse{
Status: HealthCheckResponse_SERVING,
}, nil
}
// Read implements ResourceServer.
func (n *noopService) Read(context.Context, *ReadRequest) (*ReadResponse, error) {
return nil, ErrNotImplementedYet
}
// List implements ResourceServer.
func (n *noopService) List(context.Context, *ListRequest) (*ListResponse, error) {
return nil, ErrNotImplementedYet
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,303 @@
syntax = "proto3";
package resource;
option go_package = "github.com/grafana/grafana/pkg/storage/unified/resource";
message ResourceKey {
// Namespace (tenant)
string namespace = 2;
// Resource Group
string group = 1;
// The resource type
string resource = 3;
// Resource identifier (unique within namespace+group+resource)
string name = 4;
}
message ResourceWrapper {
// The resource version
int64 resource_version = 1;
// Full kubernetes json bytes (although the resource version may not be accurate)
bytes value = 2;
}
// The history and trash commands need access to commit messages
message ResourceMeta {
// The resource version
int64 resource_version = 1;
// Size of the full resource body
int32 size = 3;
// Hash for the resource
string hash = 4;
// The kubernetes metadata section (not the full resource)
// https://github.com/kubernetes/kubernetes/blob/v1.30.2/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go#L1496
bytes partial_object_meta = 6;
}
// Status structure is copied from:
// https://github.com/kubernetes/apimachinery/blob/v0.30.1/pkg/apis/meta/v1/generated.proto#L979
message StatusResult {
// Status of the operation.
// One of: "Success" or "Failure".
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
// +optional
string status = 1;
// A human-readable description of the status of this operation.
// +optional
string message = 2;
// A machine-readable description of why this operation is in the
// "Failure" status. If this value is empty there
// is no information available. A Reason clarifies an HTTP status
// code but does not override it.
// +optional
string reason = 3;
// Suggested HTTP return code for this status, 0 if not set.
// +optional
int32 code = 4;
}
// ----------------------------------
// CRUD Objects
// ----------------------------------
message CreateRequest {
// Requires group+resource to be configuired
// If name is not set, a unique name will be generated
// The resourceVersion should not be set
ResourceKey key = 1;
// The resource JSON.
bytes value = 2;
}
message CreateResponse {
// Status code
StatusResult status = 1;
// The updated resource version
int64 resource_version = 2;
// The resource JSON. With managed annotations included
bytes value = 3;
}
message UpdateRequest {
// Full key must be set
ResourceKey key = 1;
// The current resource version
int64 resource_version = 2;
// The resource JSON.
bytes value = 3;
}
message UpdateResponse {
// Status code
StatusResult status = 1;
// The updated resource version
int64 resource_version = 2;
// The resource JSON. With managed annotations included
bytes value = 3;
}
message DeleteRequest {
ResourceKey key = 1;
// The current resource version
int64 resource_version = 2;
// Preconditions: make sure the uid matches the current saved value
// +optional
string uid = 3;
}
message DeleteResponse {
// Status code
StatusResult status = 1;
// The resource version for the deletion marker
int64 resource_version = 2;
}
message ReadRequest {
ResourceKey key = 1;
// Optionally pick an explicit resource version
int64 resource_version = 3;
}
message ReadResponse {
// Status code
StatusResult status = 1;
// The new resource version
int64 resource_version = 2;
// The properties
bytes value = 3;
}
// ----------------------------------
// List Request/Response
// ----------------------------------
// The label filtering requirements:
// https://github.com/kubernetes/kubernetes/blob/v1.30.1/staging/src/k8s.io/apimachinery/pkg/labels/selector.go#L141
message Requirement {
string key = 1;
string operator = 2; // See https://github.com/kubernetes/kubernetes/blob/v1.30.1/staging/src/k8s.io/apimachinery/pkg/selection/operator.go#L21
repeated string values = 3; // typically one value, but depends on the operator
}
message ListOptions {
// Group+Namespace+Resource (not name)
ResourceKey key = 1;
// (best effort) Match label
// Allowed to send more results than actually match because the filter will be appled
// to the resutls agin in the client. That time with the full field selector
repeated Requirement labels = 2;
// (best effort) fields matcher
// Allowed to send more results than actually match because the filter will be appled
// to the resutls agin in the client. That time with the full field selector
repeated Requirement fields = 3;
}
enum ResourceVersionMatch {
NotOlderThan = 0;
Exact = 1;
}
message ListRequest {
// Starting from the requested page (other query parameters must match!)
string next_page_token = 1;
// The resource version
int64 resource_version = 2;
// List options
ResourceVersionMatch version_match = 3;
// Maximum number of items to return
// NOTE responses will also be limited by the response payload size
int64 limit = 4;
// Filtering
ListOptions options = 5;
}
message ListResponse {
repeated ResourceWrapper items = 1;
// When more results exist, pass this in the next request
string next_page_token = 2;
// ResourceVersion of the list response
int64 resource_version = 3;
// remainingItemCount is the number of subsequent items in the list which are not included in this
// list response. If the list request contained label or field selectors, then the number of
// remaining items is unknown and the field will be left unset and omitted during serialization.
// If the list is complete (either because it is not chunking or because this is the last chunk),
// then there are no more remaining items and this field will be left unset and omitted during
// serialization.
//
// The intended use of the remainingItemCount is *estimating* the size of a collection. Clients
// should not rely on the remainingItemCount to be set or to be exact.
// +optional
int64 remaining_item_count = 4; // 0 won't be set either (no next page token)
}
message WatchRequest {
// ResourceVersion of last changes. Empty will default to full history
int64 since = 1;
// Additional options
ListOptions options = 3;
// Return initial events
bool send_initial_events = 4;
// When done with initial events, send a bookmark event
bool allow_watch_bookmarks = 5;
}
message WatchEvent {
enum Type {
UNKNOWN = 0;
ADDED = 1;
MODIFIED = 2;
DELETED = 3;
BOOKMARK = 4;
ERROR = 5;
}
message Resource {
int64 version = 1;
bytes value = 2;
}
// Timestamp the event was sent
int64 timestamp = 1;
// Timestamp the event was sent
Type type = 2;
// Resource version for the object
Resource resource = 3;
// Previous resource version (for update+delete)
Resource previous = 4;
}
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3; // Used only by the Watch method.
}
ServingStatus status = 1;
}
// This provides the CRUD+List+Watch support needed for a k8s apiserver
// The semantics and behaviors of this service are constrained by kubernetes
// This does not understand the resource schemas, only deals with json bytes
// Clients should not use this interface directly; it is for use in API Servers
service ResourceStore {
rpc Read(ReadRequest) returns (ReadResponse);
rpc Create(CreateRequest) returns (CreateResponse);
rpc Update(UpdateRequest) returns (UpdateResponse);
rpc Delete(DeleteRequest) returns (DeleteResponse);
// The results *may* include values that should not be returned to the user
// This will perform best-effort filtering to increase performace.
// NOTE: storage.Interface is ultimatly responsible for the final filtering
rpc List(ListRequest) returns (ListResponse);
// The results *may* include values that should not be returned to the user
// This will perform best-effort filtering to increase performace.
// NOTE: storage.Interface is ultimatly responsible for the final filtering
rpc Watch(WatchRequest) returns (stream WatchEvent);
}
// Clients can use this service directly
// NOTE: This is read only, and no read afer write guarantees
service Diagnostics {
// Check if the service is healthy
rpc IsHealthy(HealthCheckRequest) returns (HealthCheckResponse);
}

View File

@ -0,0 +1,445 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.4.0
// - protoc (unknown)
// source: resource.proto
package resource
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.62.0 or later.
const _ = grpc.SupportPackageIsVersion8
const (
ResourceStore_Read_FullMethodName = "/resource.ResourceStore/Read"
ResourceStore_Create_FullMethodName = "/resource.ResourceStore/Create"
ResourceStore_Update_FullMethodName = "/resource.ResourceStore/Update"
ResourceStore_Delete_FullMethodName = "/resource.ResourceStore/Delete"
ResourceStore_List_FullMethodName = "/resource.ResourceStore/List"
ResourceStore_Watch_FullMethodName = "/resource.ResourceStore/Watch"
)
// ResourceStoreClient is the client API for ResourceStore service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
//
// This provides the CRUD+List+Watch support needed for a k8s apiserver
// The semantics and behaviors of this service are constrained by kubernetes
// This does not understand the resource schemas, only deals with json bytes
// Clients should not use this interface directly; it is for use in API Servers
type ResourceStoreClient interface {
Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error)
Create(ctx context.Context, in *CreateRequest, opts ...grpc.CallOption) (*CreateResponse, error)
Update(ctx context.Context, in *UpdateRequest, opts ...grpc.CallOption) (*UpdateResponse, error)
Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error)
// The results *may* include values that should not be returned to the user
// This will perform best-effort filtering to increase performace.
// NOTE: storage.Interface is ultimatly responsible for the final filtering
List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error)
// The results *may* include values that should not be returned to the user
// This will perform best-effort filtering to increase performace.
// NOTE: storage.Interface is ultimatly responsible for the final filtering
Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (ResourceStore_WatchClient, error)
}
type resourceStoreClient struct {
cc grpc.ClientConnInterface
}
func NewResourceStoreClient(cc grpc.ClientConnInterface) ResourceStoreClient {
return &resourceStoreClient{cc}
}
func (c *resourceStoreClient) Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ReadResponse)
err := c.cc.Invoke(ctx, ResourceStore_Read_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *resourceStoreClient) Create(ctx context.Context, in *CreateRequest, opts ...grpc.CallOption) (*CreateResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(CreateResponse)
err := c.cc.Invoke(ctx, ResourceStore_Create_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *resourceStoreClient) Update(ctx context.Context, in *UpdateRequest, opts ...grpc.CallOption) (*UpdateResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(UpdateResponse)
err := c.cc.Invoke(ctx, ResourceStore_Update_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *resourceStoreClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(DeleteResponse)
err := c.cc.Invoke(ctx, ResourceStore_Delete_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *resourceStoreClient) List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ListResponse)
err := c.cc.Invoke(ctx, ResourceStore_List_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *resourceStoreClient) Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (ResourceStore_WatchClient, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &ResourceStore_ServiceDesc.Streams[0], ResourceStore_Watch_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &resourceStoreWatchClient{ClientStream: stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type ResourceStore_WatchClient interface {
Recv() (*WatchEvent, error)
grpc.ClientStream
}
type resourceStoreWatchClient struct {
grpc.ClientStream
}
func (x *resourceStoreWatchClient) Recv() (*WatchEvent, error) {
m := new(WatchEvent)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// ResourceStoreServer is the server API for ResourceStore service.
// All implementations should embed UnimplementedResourceStoreServer
// for forward compatibility
//
// This provides the CRUD+List+Watch support needed for a k8s apiserver
// The semantics and behaviors of this service are constrained by kubernetes
// This does not understand the resource schemas, only deals with json bytes
// Clients should not use this interface directly; it is for use in API Servers
type ResourceStoreServer interface {
Read(context.Context, *ReadRequest) (*ReadResponse, error)
Create(context.Context, *CreateRequest) (*CreateResponse, error)
Update(context.Context, *UpdateRequest) (*UpdateResponse, error)
Delete(context.Context, *DeleteRequest) (*DeleteResponse, error)
// The results *may* include values that should not be returned to the user
// This will perform best-effort filtering to increase performace.
// NOTE: storage.Interface is ultimatly responsible for the final filtering
List(context.Context, *ListRequest) (*ListResponse, error)
// The results *may* include values that should not be returned to the user
// This will perform best-effort filtering to increase performace.
// NOTE: storage.Interface is ultimatly responsible for the final filtering
Watch(*WatchRequest, ResourceStore_WatchServer) error
}
// UnimplementedResourceStoreServer should be embedded to have forward compatible implementations.
type UnimplementedResourceStoreServer struct {
}
func (UnimplementedResourceStoreServer) Read(context.Context, *ReadRequest) (*ReadResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Read not implemented")
}
func (UnimplementedResourceStoreServer) Create(context.Context, *CreateRequest) (*CreateResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Create not implemented")
}
func (UnimplementedResourceStoreServer) Update(context.Context, *UpdateRequest) (*UpdateResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Update not implemented")
}
func (UnimplementedResourceStoreServer) Delete(context.Context, *DeleteRequest) (*DeleteResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Delete not implemented")
}
func (UnimplementedResourceStoreServer) List(context.Context, *ListRequest) (*ListResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method List not implemented")
}
func (UnimplementedResourceStoreServer) Watch(*WatchRequest, ResourceStore_WatchServer) error {
return status.Errorf(codes.Unimplemented, "method Watch not implemented")
}
// UnsafeResourceStoreServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to ResourceStoreServer will
// result in compilation errors.
type UnsafeResourceStoreServer interface {
mustEmbedUnimplementedResourceStoreServer()
}
func RegisterResourceStoreServer(s grpc.ServiceRegistrar, srv ResourceStoreServer) {
s.RegisterService(&ResourceStore_ServiceDesc, srv)
}
func _ResourceStore_Read_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ReadRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ResourceStoreServer).Read(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ResourceStore_Read_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ResourceStoreServer).Read(ctx, req.(*ReadRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ResourceStore_Create_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CreateRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ResourceStoreServer).Create(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ResourceStore_Create_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ResourceStoreServer).Create(ctx, req.(*CreateRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ResourceStore_Update_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(UpdateRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ResourceStoreServer).Update(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ResourceStore_Update_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ResourceStoreServer).Update(ctx, req.(*UpdateRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ResourceStore_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DeleteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ResourceStoreServer).Delete(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ResourceStore_Delete_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ResourceStoreServer).Delete(ctx, req.(*DeleteRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ResourceStore_List_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ResourceStoreServer).List(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ResourceStore_List_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ResourceStoreServer).List(ctx, req.(*ListRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ResourceStore_Watch_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(WatchRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(ResourceStoreServer).Watch(m, &resourceStoreWatchServer{ServerStream: stream})
}
type ResourceStore_WatchServer interface {
Send(*WatchEvent) error
grpc.ServerStream
}
type resourceStoreWatchServer struct {
grpc.ServerStream
}
func (x *resourceStoreWatchServer) Send(m *WatchEvent) error {
return x.ServerStream.SendMsg(m)
}
// ResourceStore_ServiceDesc is the grpc.ServiceDesc for ResourceStore service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var ResourceStore_ServiceDesc = grpc.ServiceDesc{
ServiceName: "resource.ResourceStore",
HandlerType: (*ResourceStoreServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Read",
Handler: _ResourceStore_Read_Handler,
},
{
MethodName: "Create",
Handler: _ResourceStore_Create_Handler,
},
{
MethodName: "Update",
Handler: _ResourceStore_Update_Handler,
},
{
MethodName: "Delete",
Handler: _ResourceStore_Delete_Handler,
},
{
MethodName: "List",
Handler: _ResourceStore_List_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Watch",
Handler: _ResourceStore_Watch_Handler,
ServerStreams: true,
},
},
Metadata: "resource.proto",
}
const (
Diagnostics_IsHealthy_FullMethodName = "/resource.Diagnostics/IsHealthy"
)
// DiagnosticsClient is the client API for Diagnostics service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
//
// Clients can use this service directly
// NOTE: This is read only, and no read afer write guarantees
type DiagnosticsClient interface {
// Check if the service is healthy
IsHealthy(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error)
}
type diagnosticsClient struct {
cc grpc.ClientConnInterface
}
func NewDiagnosticsClient(cc grpc.ClientConnInterface) DiagnosticsClient {
return &diagnosticsClient{cc}
}
func (c *diagnosticsClient) IsHealthy(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(HealthCheckResponse)
err := c.cc.Invoke(ctx, Diagnostics_IsHealthy_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// DiagnosticsServer is the server API for Diagnostics service.
// All implementations should embed UnimplementedDiagnosticsServer
// for forward compatibility
//
// Clients can use this service directly
// NOTE: This is read only, and no read afer write guarantees
type DiagnosticsServer interface {
// Check if the service is healthy
IsHealthy(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error)
}
// UnimplementedDiagnosticsServer should be embedded to have forward compatible implementations.
type UnimplementedDiagnosticsServer struct {
}
func (UnimplementedDiagnosticsServer) IsHealthy(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method IsHealthy not implemented")
}
// UnsafeDiagnosticsServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to DiagnosticsServer will
// result in compilation errors.
type UnsafeDiagnosticsServer interface {
mustEmbedUnimplementedDiagnosticsServer()
}
func RegisterDiagnosticsServer(s grpc.ServiceRegistrar, srv DiagnosticsServer) {
s.RegisterService(&Diagnostics_ServiceDesc, srv)
}
func _Diagnostics_IsHealthy_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HealthCheckRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DiagnosticsServer).IsHealthy(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Diagnostics_IsHealthy_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DiagnosticsServer).IsHealthy(ctx, req.(*HealthCheckRequest))
}
return interceptor(ctx, in, info, handler)
}
// Diagnostics_ServiceDesc is the grpc.ServiceDesc for Diagnostics service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Diagnostics_ServiceDesc = grpc.ServiceDesc{
ServiceName: "resource.Diagnostics",
HandlerType: (*DiagnosticsServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "IsHealthy",
Handler: _Diagnostics_IsHealthy_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "resource.proto",
}

View File

@ -0,0 +1,549 @@
package resource
import (
context "context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"sync"
"time"
"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apimachinery/utils"
)
// Package-level errors.
var (
ErrNotFound = errors.New("entity not found")
ErrOptimisticLockingFailed = errors.New("optimistic locking failed")
ErrUserNotFoundInContext = errors.New("user not found in context")
ErrUnableToReadResourceJSON = errors.New("unable to read resource json")
ErrNotImplementedYet = errors.New("not implemented yet")
)
// ResourceServer implements all services
type ResourceServer interface {
ResourceStoreServer
DiagnosticsServer
LifecycleHooks
}
// The StorageBackend is an internal abstraction that supports interacting with
// the underlying raw storage medium. This interface is never exposed directly,
// it is provided by concrete instances that actually write values.
type StorageBackend interface {
// Write a Create/Update/Delete,
// NOTE: the contents of WriteEvent have been validated
// Return the revisionVersion for this event or error
WriteEvent(context.Context, WriteEvent) (int64, error)
// Read a value from storage optionally at an explicit version
Read(context.Context, *ReadRequest) (*ReadResponse, error)
// When the ResourceServer executes a List request, it will first
// query the backend for potential results. All results will be
// checked against the kubernetes requirements before finally returning
// results. The list options can be used to improve performance
// but are the the final answer.
PrepareList(context.Context, *ListRequest) (*ListResponse, error)
// Get all events from the store
// For HA setups, this will be more events than the local WriteEvent above!
WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error)
}
type ResourceServerOptions struct {
// OTel tracer
Tracer trace.Tracer
// Real storage backend
Backend StorageBackend
// Diagnostics
Diagnostics DiagnosticsServer
// Check if a user has access to write folders
// When this is nil, no resources can have folders configured
WriteAccess WriteAccessHooks
// Callbacks for startup and shutdown
Lifecycle LifecycleHooks
// Get the current time in unix millis
Now func() int64
}
func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
if opts.Tracer == nil {
opts.Tracer = noop.NewTracerProvider().Tracer("resource-server")
}
if opts.Backend == nil {
return nil, fmt.Errorf("missing Backend implementation")
}
if opts.Diagnostics == nil {
opts.Diagnostics = &noopService{}
}
if opts.Now == nil {
opts.Now = func() int64 {
return time.Now().UnixMilli()
}
}
// Make this cancelable
ctx, cancel := context.WithCancel(identity.WithRequester(context.Background(),
&identity.StaticRequester{
Namespace: identity.NamespaceServiceAccount,
Login: "watcher", // admin user for watch
UserID: 1,
IsGrafanaAdmin: true,
}))
return &server{
tracer: opts.Tracer,
log: slog.Default().With("logger", "resource-server"),
backend: opts.Backend,
diagnostics: opts.Diagnostics,
access: opts.WriteAccess,
lifecycle: opts.Lifecycle,
now: opts.Now,
ctx: ctx,
cancel: cancel,
}, nil
}
var _ ResourceServer = &server{}
type server struct {
tracer trace.Tracer
log *slog.Logger
backend StorageBackend
diagnostics DiagnosticsServer
access WriteAccessHooks
lifecycle LifecycleHooks
now func() int64
// Background watch task -- this has permissions for everything
ctx context.Context
cancel context.CancelFunc
broadcaster Broadcaster[*WrittenEvent]
// init checking
once sync.Once
initErr error
}
// Init implements ResourceServer.
func (s *server) Init() error {
s.once.Do(func() {
// Call lifecycle hooks
if s.lifecycle != nil {
err := s.lifecycle.Init()
if err != nil {
s.initErr = fmt.Errorf("initialize Resource Server: %w", err)
}
}
// Start watching for changes
if s.initErr == nil {
s.initErr = s.initWatcher()
}
if s.initErr != nil {
s.log.Error("error initializing resource server", "error", s.initErr)
}
})
return s.initErr
}
func (s *server) Stop() {
s.initErr = fmt.Errorf("service is stopping")
if s.lifecycle != nil {
s.lifecycle.Stop()
}
// Stops the streaming
s.cancel()
// mark the value as done
s.initErr = fmt.Errorf("service is stopped")
}
// Old value indicates an update -- otherwise a create
func (s *server) newEventBuilder(ctx context.Context, key *ResourceKey, value, oldValue []byte) (*writeEventBuilder, error) {
event, err := newEventFromBytes(value, oldValue)
if err != nil {
return nil, err
}
event.Key = key
event.Requester, err = identity.GetRequester(ctx)
if err != nil {
return nil, ErrUserNotFoundInContext
}
obj := event.Meta
if key.Namespace != obj.GetNamespace() {
return nil, apierrors.NewBadRequest("key/namespace do not match")
}
gvk := obj.GetGroupVersionKind()
if gvk.Kind == "" {
return nil, apierrors.NewBadRequest("expecting resources with a kind in the body")
}
if gvk.Version == "" {
return nil, apierrors.NewBadRequest("expecting resources with an apiVersion")
}
if gvk.Group != "" && gvk.Group != key.Group {
return nil, apierrors.NewBadRequest(
fmt.Sprintf("group in key does not match group in the body (%s != %s)", key.Group, gvk.Group),
)
}
// This needs to be a create function
if key.Name == "" {
if obj.GetName() == "" {
return nil, apierrors.NewBadRequest("missing name")
}
key.Name = obj.GetName()
} else if key.Name != obj.GetName() {
return nil, apierrors.NewBadRequest(
fmt.Sprintf("key/name do not match (key: %s, name: %s)", key.Name, obj.GetName()))
}
obj.SetGenerateName("")
err = validateName(obj.GetName())
if err != nil {
return nil, err
}
folder := obj.GetFolder()
if folder != "" {
err = s.access.CanWriteFolder(ctx, event.Requester, folder)
if err != nil {
return nil, err
}
}
origin, err := obj.GetOriginInfo()
if err != nil {
return nil, apierrors.NewBadRequest("invalid origin info")
}
if origin != nil {
err = s.access.CanWriteOrigin(ctx, event.Requester, origin.Name)
if err != nil {
return nil, err
}
}
obj.SetOriginInfo(origin)
// Ensure old values do not mutate things they should not
if event.OldMeta != nil {
old := event.OldMeta
obj.SetUID(old.GetUID())
obj.SetCreatedBy(old.GetCreatedBy())
obj.SetCreationTimestamp(old.GetCreationTimestamp())
}
return event, nil
}
func (s *server) Create(ctx context.Context, req *CreateRequest) (*CreateResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.Create")
defer span.End()
if err := s.Init(); err != nil {
return nil, err
}
rsp := &CreateResponse{}
builder, err := s.newEventBuilder(ctx, req.Key, req.Value, nil)
if err != nil {
rsp.Status, err = errToStatus(err)
return rsp, err
}
obj := builder.Meta
obj.SetCreatedBy(builder.Requester.GetUID().String())
obj.SetUpdatedBy("")
obj.SetUpdatedTimestamp(nil)
obj.SetCreationTimestamp(metav1.NewTime(time.UnixMilli(s.now())))
obj.SetUID(types.UID(uuid.New().String()))
event, err := builder.toEvent()
if err != nil {
rsp.Status, err = errToStatus(err)
return rsp, err
}
rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, event)
if err == nil {
rsp.Value = event.Value // with mutated fields
} else {
rsp.Status, err = errToStatus(err)
}
return rsp, err
}
// Convert golang errors to status result errors that can be returned to a client
func errToStatus(err error) (*StatusResult, error) {
if err != nil {
apistatus, ok := err.(apierrors.APIStatus)
if ok {
s := apistatus.Status()
return &StatusResult{
Status: s.Status,
Message: s.Message,
Reason: string(s.Reason),
Code: s.Code,
}, nil
}
// TODO... better conversion!!!
return &StatusResult{
Status: "Failure",
Message: err.Error(),
Code: 500,
}, nil
}
return nil, err
}
func (s *server) Update(ctx context.Context, req *UpdateRequest) (*UpdateResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.Update")
defer span.End()
if err := s.Init(); err != nil {
return nil, err
}
rsp := &UpdateResponse{}
if req.ResourceVersion < 0 {
rsp.Status, _ = errToStatus(apierrors.NewBadRequest("update must include the previous version"))
return rsp, nil
}
latest, err := s.backend.Read(ctx, &ReadRequest{
Key: req.Key,
})
if err != nil {
return nil, err
}
if latest.Value == nil {
return nil, apierrors.NewBadRequest("current value does not exist")
}
builder, err := s.newEventBuilder(ctx, req.Key, req.Value, latest.Value)
if err != nil {
rsp.Status, err = errToStatus(err)
return rsp, err
}
obj := builder.Meta
obj.SetUpdatedBy(builder.Requester.GetUID().String())
obj.SetUpdatedTimestampMillis(time.Now().UnixMilli())
event, err := builder.toEvent()
if err != nil {
rsp.Status, err = errToStatus(err)
return rsp, err
}
event.Type = WatchEvent_MODIFIED
event.PreviousRV = latest.ResourceVersion
rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, event)
rsp.Status, err = errToStatus(err)
if err == nil {
rsp.Value = event.Value // with mutated fields
} else {
rsp.Status, err = errToStatus(err)
}
return rsp, err
}
func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.Delete")
defer span.End()
if err := s.Init(); err != nil {
return nil, err
}
rsp := &DeleteResponse{}
if req.ResourceVersion < 0 {
return nil, apierrors.NewBadRequest("update must include the previous version")
}
latest, err := s.backend.Read(ctx, &ReadRequest{
Key: req.Key,
})
if err != nil {
return nil, err
}
if req.ResourceVersion > 0 && latest.ResourceVersion != req.ResourceVersion {
return nil, ErrOptimisticLockingFailed
}
now := metav1.NewTime(time.UnixMilli(s.now()))
event := WriteEvent{
Key: req.Key,
Type: WatchEvent_DELETED,
PreviousRV: latest.ResourceVersion,
}
requester, err := identity.GetRequester(ctx)
if err != nil {
return nil, apierrors.NewBadRequest("unable to get user")
}
marker := &DeletedMarker{}
err = json.Unmarshal(latest.Value, marker)
if err != nil {
return nil, apierrors.NewBadRequest(
fmt.Sprintf("unable to read previous object, %v", err))
}
obj, err := utils.MetaAccessor(marker)
if err != nil {
return nil, err
}
obj.SetDeletionTimestamp(&now)
obj.SetUpdatedTimestamp(&now.Time)
obj.SetManagedFields(nil)
obj.SetFinalizers(nil)
obj.SetUpdatedBy(requester.GetUID().String())
marker.TypeMeta = metav1.TypeMeta{
Kind: "DeletedMarker",
APIVersion: "common.grafana.app/v0alpha1", // ?? or can we stick this in common?
}
marker.Annotations["RestoreResourceVersion"] = fmt.Sprintf("%d", event.PreviousRV)
event.Value, err = json.Marshal(marker)
if err != nil {
return nil, apierrors.NewBadRequest(
fmt.Sprintf("unable creating deletion marker, %v", err))
}
rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, event)
rsp.Status, err = errToStatus(err)
return rsp, err
}
func (s *server) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) {
if err := s.Init(); err != nil {
return nil, err
}
// if req.Key.Group == "" {
// status, _ := errToStatus(apierrors.NewBadRequest("missing group"))
// return &ReadResponse{Status: status}, nil
// }
if req.Key.Resource == "" {
status, _ := errToStatus(apierrors.NewBadRequest("missing resource"))
return &ReadResponse{Status: status}, nil
}
rsp, err := s.backend.Read(ctx, req)
if err != nil {
if rsp == nil {
rsp = &ReadResponse{}
}
rsp.Status, err = errToStatus(err)
}
return rsp, err
}
func (s *server) List(ctx context.Context, req *ListRequest) (*ListResponse, error) {
if err := s.Init(); err != nil {
return nil, err
}
rsp, err := s.backend.PrepareList(ctx, req)
// Status???
return rsp, err
}
func (s *server) initWatcher() error {
var err error
s.broadcaster, err = NewBroadcaster(s.ctx, func(out chan<- *WrittenEvent) error {
events, err := s.backend.WatchWriteEvents(s.ctx)
if err != nil {
return err
}
go func() {
for {
// pipe all events
v := <-events
out <- v
}
}()
return nil
})
return err
}
func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error {
if err := s.Init(); err != nil {
return err
}
ctx := srv.Context()
// Start listening -- this will buffer any changes that happen while we backfill
stream, err := s.broadcaster.Subscribe(ctx)
if err != nil {
return err
}
defer s.broadcaster.Unsubscribe(stream)
since := req.Since
if req.SendInitialEvents {
fmt.Printf("TODO... query\n")
// All initial events are CREATE
if req.AllowWatchBookmarks {
fmt.Printf("TODO... send bookmark\n")
}
}
for {
select {
case <-ctx.Done():
return nil
case event, ok := <-stream:
if !ok {
s.log.Debug("watch events closed")
return nil
}
if event.ResourceVersion > since && matchesQueryKey(req.Options.Key, event.Key) {
// Currently sending *every* event
// if req.Options.Labels != nil {
// // match *either* the old or new object
// }
// TODO: return values that match either the old or the new
srv.Send(&WatchEvent{
Timestamp: event.Timestamp,
Type: event.Type,
Resource: &WatchEvent_Resource{
Value: event.Value,
Version: event.ResourceVersion,
},
// TODO... previous???
})
}
}
}
}
// IsHealthy implements ResourceServer.
func (s *server) IsHealthy(ctx context.Context, req *HealthCheckRequest) (*HealthCheckResponse, error) {
if err := s.Init(); err != nil {
return nil, err
}
return s.diagnostics.IsHealthy(ctx, req)
}

View File

@ -0,0 +1,167 @@
package resource
import (
"context"
"encoding/json"
"fmt"
"os"
"testing"
"time"
"github.com/stretchr/testify/require"
"gocloud.dev/blob/fileblob"
"gocloud.dev/blob/memblob"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apimachinery/utils"
)
func TestSimpleServer(t *testing.T) {
testUserA := &identity.StaticRequester{
Namespace: identity.NamespaceUser,
Login: "testuser",
UserID: 123,
UserUID: "u123",
OrgRole: identity.RoleAdmin,
IsGrafanaAdmin: true, // can do anything
}
ctx := identity.WithRequester(context.Background(), testUserA)
bucket := memblob.OpenBucket(nil)
if false {
tmp, err := os.MkdirTemp("", "xxx-*")
require.NoError(t, err)
bucket, err = fileblob.OpenBucket(tmp, &fileblob.Options{
CreateDir: true,
Metadata: fileblob.MetadataDontWrite, // skip
})
require.NoError(t, err)
fmt.Printf("ROOT: %s\n\n", tmp)
}
store, err := NewCDKBackend(ctx, CDKBackendOptions{
Bucket: bucket,
})
require.NoError(t, err)
server, err := NewResourceServer(ResourceServerOptions{
Backend: store,
})
require.NoError(t, err)
t.Run("playlist happy CRUD paths", func(t *testing.T) {
raw := []byte(`{
"apiVersion": "playlist.grafana.app/v0alpha1",
"kind": "Playlist",
"metadata": {
"name": "fdgsv37qslr0ga",
"namespace": "default",
"annotations": {
"grafana.app/originName": "elsewhere",
"grafana.app/originPath": "path/to/item",
"grafana.app/originTimestamp": "2024-02-02T00:00:00Z"
}
},
"spec": {
"title": "hello",
"interval": "5m",
"items": [
{
"type": "dashboard_by_uid",
"value": "vmie2cmWz"
}
]
}
}`)
key := &ResourceKey{
Group: "playlist.grafana.app",
Resource: "rrrr", // can be anything :(
Namespace: "default",
Name: "fdgsv37qslr0ga",
}
// Should be empty when we start
all, err := server.List(ctx, &ListRequest{Options: &ListOptions{
Key: &ResourceKey{
Group: key.Group,
Resource: key.Resource,
},
}})
require.NoError(t, err)
require.Len(t, all.Items, 0)
created, err := server.Create(ctx, &CreateRequest{
Value: raw,
Key: key,
})
require.NoError(t, err)
require.Nil(t, created.Status)
require.True(t, created.ResourceVersion > 0)
// The key does not include resource version
found, err := server.Read(ctx, &ReadRequest{Key: key})
require.NoError(t, err)
require.Nil(t, found.Status)
require.Equal(t, created.ResourceVersion, found.ResourceVersion)
// Now update the value
tmp := &unstructured.Unstructured{}
err = json.Unmarshal(created.Value, tmp)
require.NoError(t, err)
now := time.Now().UnixMilli()
obj, err := utils.MetaAccessor(tmp)
require.NoError(t, err)
obj.SetAnnotation("test", "hello")
obj.SetUpdatedTimestampMillis(now)
obj.SetUpdatedBy(testUserA.GetUID().String())
raw, err = json.Marshal(tmp)
require.NoError(t, err)
updated, err := server.Update(ctx, &UpdateRequest{
Key: key,
Value: raw,
ResourceVersion: created.ResourceVersion})
require.NoError(t, err)
require.Nil(t, updated.Status)
require.True(t, updated.ResourceVersion > created.ResourceVersion)
// We should still get the latest
found, err = server.Read(ctx, &ReadRequest{Key: key})
require.NoError(t, err)
require.Nil(t, found.Status)
require.Equal(t, updated.ResourceVersion, found.ResourceVersion)
all, err = server.List(ctx, &ListRequest{Options: &ListOptions{
Key: &ResourceKey{
Group: key.Group,
Resource: key.Resource,
},
}})
require.NoError(t, err)
require.Len(t, all.Items, 1)
require.Equal(t, updated.ResourceVersion, all.Items[0].ResourceVersion)
deleted, err := server.Delete(ctx, &DeleteRequest{Key: key, ResourceVersion: updated.ResourceVersion})
require.NoError(t, err)
require.True(t, deleted.ResourceVersion > updated.ResourceVersion)
// We should get not found status when trying to read the latest value
found, err = server.Read(ctx, &ReadRequest{Key: key})
require.NoError(t, err)
require.NotNil(t, found.Status)
require.Equal(t, int32(404), found.Status.Code)
// And the deleted value should not be in the results
all, err = server.List(ctx, &ListRequest{Options: &ListOptions{
Key: &ResourceKey{
Group: key.Group,
Resource: key.Resource,
},
}})
require.NoError(t, err)
require.Len(t, all.Items, 0) // empty
})
}

View File

@ -0,0 +1,25 @@
package resource
import (
"fmt"
"regexp"
)
var validNameCharPattern = `a-zA-Z0-9\-\_`
var validNamePattern = regexp.MustCompile(`^[` + validNameCharPattern + `]*$`).MatchString
func validateName(name string) error {
if len(name) < 2 {
return fmt.Errorf("name is too short")
}
if len(name) > 64 {
return fmt.Errorf("name is too long")
}
if !validNamePattern(name) {
return fmt.Errorf("name includes invalid characters")
}
// In standard k8s, it must not start with a number
// however that would force us to update many many many existing resources
// so we will be slightly more lenient than standard k8s
return nil
}

View File

@ -25,6 +25,7 @@ import (
"k8s.io/client-go/rest"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/infra/localcache"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/server"
@ -158,14 +159,11 @@ func (c *K8sResourceClient) SanitizeJSON(v *unstructured.Unstructured) string {
if anno["grafana.app/originHash"] != "" {
anno["grafana.app/originHash"] = "${originHash}"
}
if anno["grafana.app/updatedTimestamp"] != "" {
anno["grafana.app/updatedTimestamp"] = "${updatedTimestamp}"
}
// Remove annotations that are not added by legacy storage
delete(anno, "grafana.app/originTimestamp")
delete(anno, "grafana.app/createdBy")
delete(anno, "grafana.app/updatedBy")
delete(anno, "grafana.app/action")
delete(anno, utils.AnnoKeyOriginTimestamp)
delete(anno, utils.AnnoKeyCreatedBy)
delete(anno, utils.AnnoKeyUpdatedBy)
delete(anno, utils.AnnoKeyUpdatedTimestamp)
deep.SetAnnotations(anno)
copy := deep.Object

View File

@ -361,8 +361,7 @@ func doPlaylistTests(t *testing.T, helper *apis.K8sTestHelper) *apis.K8sTestHelp
"metadata": {
"annotations": {
"grafana.app/originPath": "${originPath}",
"grafana.app/originName": "SQL",
"grafana.app/updatedTimestamp": "${updatedTimestamp}"
"grafana.app/originName": "SQL"
},
"creationTimestamp": "${creationTimestamp}",
"name": "` + uid + `",