Store: protobuf based GRN/identifier (#57714)

This commit is contained in:
Ryan McKinley 2022-10-31 07:26:16 -07:00 committed by GitHub
parent 46093c1267
commit 3527cad9dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1014 additions and 614 deletions

View File

@ -45,6 +45,12 @@ const (
// ExternalEntityReferenceRuntime_Transformer is a "type" under runtime
// UIDs include: joinByField, organize, seriesToColumns, etc
ExternalEntityReferenceRuntime_Transformer = "transformer"
// ObjectStoreScopeEntity is organized in: {kind}/{uid}
ObjectStoreScopeEntity = "entity"
// ObjectStoreScopeDrive is organized in: {uid/with/slash}.{kind}
ObjectStoreScopeDrive = "drive"
)
// ObjectKindInfo describes information needed from the object store

View File

@ -109,8 +109,11 @@ func (e *objectStoreJob) start() {
}
_, err = e.store.Write(ctx, &object.WriteObjectRequest{
UID: fmt.Sprintf("export/%s", dash.UID),
Kind: models.StandardKindDashboard,
GRN: &object.GRN{
Scope: models.ObjectStoreScopeEntity,
UID: dash.UID,
Kind: models.StandardKindDashboard,
},
Body: dash.Body,
Comment: "export from dashboard table",
})
@ -149,8 +152,11 @@ func (e *objectStoreJob) start() {
}
_, err = e.store.Write(ctx, &object.WriteObjectRequest{
UID: fmt.Sprintf("export/%s", playlist.Uid),
Kind: models.StandardKindPlaylist,
GRN: &object.GRN{
Scope: models.ObjectStoreScopeEntity,
UID: playlist.Uid,
Kind: models.StandardKindPlaylist,
},
Body: prettyJSON(playlist),
Comment: "export from playlists",
})

View File

@ -58,8 +58,10 @@ func (s *objectStoreImpl) sync() {
}
body, _ := json.Marshal(dto)
_, _ = s.objectstore.Write(ctx, &object.WriteObjectRequest{
UID: uid,
Kind: models.StandardKindPlaylist,
GRN: &object.GRN{
UID: uid,
Kind: models.StandardKindPlaylist,
},
Body: body,
})
}
@ -73,8 +75,11 @@ func (s *objectStoreImpl) Create(ctx context.Context, cmd *playlist.CreatePlayli
return rsp, fmt.Errorf("unable to write playlist to store")
}
_, err = s.objectstore.Write(ctx, &object.WriteObjectRequest{
UID: rsp.UID,
Kind: models.StandardKindPlaylist,
GRN: &object.GRN{
Scope: models.ObjectStoreScopeEntity,
Kind: models.StandardKindPlaylist,
UID: rsp.UID,
},
Body: body,
})
if err != nil {
@ -92,8 +97,10 @@ func (s *objectStoreImpl) Update(ctx context.Context, cmd *playlist.UpdatePlayli
return rsp, fmt.Errorf("unable to write playlist to store")
}
_, err = s.objectstore.Write(ctx, &object.WriteObjectRequest{
UID: rsp.Uid,
Kind: models.StandardKindPlaylist,
GRN: &object.GRN{
UID: rsp.Uid,
Kind: models.StandardKindPlaylist,
},
Body: body,
})
if err != nil {
@ -107,8 +114,10 @@ func (s *objectStoreImpl) Delete(ctx context.Context, cmd *playlist.DeletePlayli
err := s.sqlimpl.store.Delete(ctx, cmd)
if err == nil {
_, err = s.objectstore.Delete(ctx, &object.DeleteObjectRequest{
UID: cmd.UID,
Kind: models.StandardKindPlaylist,
GRN: &object.GRN{
UID: cmd.UID,
Kind: models.StandardKindPlaylist,
},
})
if err != nil {
return fmt.Errorf("unable to delete playlist to store")
@ -136,8 +145,10 @@ func (s *objectStoreImpl) GetWithoutItems(ctx context.Context, q *playlist.GetPl
func (s *objectStoreImpl) Get(ctx context.Context, q *playlist.GetPlaylistByUidQuery) (*playlist.PlaylistDTO, error) {
rsp, err := s.objectstore.Read(ctx, &object.ReadObjectRequest{
UID: q.UID,
Kind: models.StandardKindPlaylist,
GRN: &object.GRN{
UID: q.UID,
Kind: models.StandardKindPlaylist,
},
WithBody: true,
})
if err != nil {
@ -170,7 +181,7 @@ func (s *objectStoreImpl) Search(ctx context.Context, q *playlist.GetPlaylistsQu
err = json.Unmarshal(res.Body, found)
}
playlists = append(playlists, &playlist.Playlist{
UID: res.UID,
UID: res.GRN.UID,
Name: res.Name,
Interval: found.Interval,
})

View File

@ -21,6 +21,7 @@ type KindRegistry interface {
Register(info models.ObjectKindInfo, builder models.ObjectSummaryBuilder) error
GetSummaryBuilder(kind string) models.ObjectSummaryBuilder
GetInfo(kind string) (models.ObjectKindInfo, error)
GetFromExtension(suffix string) (models.ObjectKindInfo, error)
GetKinds() []models.ObjectKindInfo
}
@ -84,20 +85,26 @@ type kindValues struct {
}
type registry struct {
mutex sync.RWMutex
kinds map[string]*kindValues
info []models.ObjectKindInfo
mutex sync.RWMutex
kinds map[string]*kindValues
info []models.ObjectKindInfo
suffix map[string]models.ObjectKindInfo
}
func (r *registry) updateInfoArray() {
suffix := make(map[string]models.ObjectKindInfo)
info := make([]models.ObjectKindInfo, 0, len(r.kinds))
for _, v := range r.kinds {
info = append(info, v.info)
if v.info.FileExtension != "" {
suffix[v.info.FileExtension] = v.info
}
}
sort.Slice(info, func(i, j int) bool {
return info[i].ID < info[j].ID
})
r.info = info
r.suffix = suffix
}
func (r *registry) Register(info models.ObjectKindInfo, builder models.ObjectSummaryBuilder) error {
@ -144,6 +151,18 @@ func (r *registry) GetInfo(kind string) (models.ObjectKindInfo, error) {
return models.ObjectKindInfo{}, fmt.Errorf("not found")
}
// GetInfo returns the registered info
func (r *registry) GetFromExtension(suffix string) (models.ObjectKindInfo, error) {
r.mutex.RLock()
defer r.mutex.RUnlock()
v, ok := r.suffix[suffix]
if ok {
return v, nil
}
return models.ObjectKindInfo{}, fmt.Errorf("not found")
}
// GetSummaryBuilder returns a builder or nil if not found
func (r *registry) GetKinds() []models.ObjectKindInfo {
r.mutex.RLock()

View File

@ -42,4 +42,10 @@ func TestKindRegistry(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "test", info.Name)
require.True(t, info.IsRaw)
// Get by suffix
info, err = registry.GetFromExtension("png")
require.NoError(t, err)
require.Equal(t, "PNG", info.Name)
require.True(t, info.IsRaw)
}

View File

@ -32,7 +32,7 @@ type RawObjectWithHistory struct {
var (
// increment when RawObject changes
rawObjectVersion = 6
rawObjectVersion = 7
)
func ProvideDummyObjectServer(cfg *setting.Cfg, grpcServerProvider grpcserver.Provider, kinds kind.KindRegistry) object.ObjectStoreServer {
@ -51,18 +51,18 @@ type dummyObjectServer struct {
kinds kind.KindRegistry
}
func namespaceFromUID(uid string) string {
func namespaceFromUID(grn *object.GRN) string {
// TODO
return "orgId-1"
}
func (i *dummyObjectServer) findObject(ctx context.Context, uid string, kind string, version string) (*RawObjectWithHistory, *object.RawObject, error) {
if uid == "" {
return nil, nil, errors.New("UID must not be empty")
func (i *dummyObjectServer) findObject(ctx context.Context, grn *object.GRN, version string) (*RawObjectWithHistory, *object.RawObject, error) {
if grn == nil {
return nil, nil, errors.New("GRN must not be nil")
}
obj, err := i.collection.FindFirst(ctx, namespaceFromUID(uid), func(i *RawObjectWithHistory) (bool, error) {
return i.Object.UID == uid && i.Object.Kind == kind, nil
obj, err := i.collection.FindFirst(ctx, namespaceFromUID(grn), func(i *RawObjectWithHistory) (bool, error) {
return grn.Equals(i.Object.GRN), nil
})
if err != nil {
@ -81,8 +81,7 @@ func (i *dummyObjectServer) findObject(ctx context.Context, uid string, kind str
for _, objVersion := range obj.History {
if objVersion.Version == version {
copy := &object.RawObject{
UID: obj.Object.UID,
Kind: obj.Object.Kind,
GRN: obj.Object.GRN,
Created: obj.Object.Created,
CreatedBy: obj.Object.CreatedBy,
Updated: objVersion.Updated,
@ -102,7 +101,7 @@ func (i *dummyObjectServer) findObject(ctx context.Context, uid string, kind str
}
func (i *dummyObjectServer) Read(ctx context.Context, r *object.ReadObjectRequest) (*object.ReadObjectResponse, error) {
_, objVersion, err := i.findObject(ctx, r.UID, r.Kind, r.Version)
_, objVersion, err := i.findObject(ctx, r.GRN, r.Version)
if err != nil {
return nil, err
}
@ -119,9 +118,9 @@ func (i *dummyObjectServer) Read(ctx context.Context, r *object.ReadObjectReques
}
if r.WithSummary {
// Since we do not store the summary, we can just recreate on demand
builder := i.kinds.GetSummaryBuilder(r.Kind)
builder := i.kinds.GetSummaryBuilder(r.GRN.Kind)
if builder != nil {
summary, _, e2 := builder(ctx, r.UID, objVersion.Body)
summary, _, e2 := builder(ctx, r.GRN.UID, objVersion.Body)
if e2 != nil {
return nil, e2
}
@ -150,15 +149,14 @@ func createContentsHash(contents []byte) string {
}
func (i *dummyObjectServer) update(ctx context.Context, r *object.WriteObjectRequest, namespace string) (*object.WriteObjectResponse, error) {
builder := i.kinds.GetSummaryBuilder(r.Kind)
builder := i.kinds.GetSummaryBuilder(r.GRN.Kind)
if builder == nil {
return nil, fmt.Errorf("unsupported kind: " + r.Kind)
return nil, fmt.Errorf("unsupported kind: " + r.GRN.Kind)
}
rsp := &object.WriteObjectResponse{}
updatedCount, err := i.collection.Update(ctx, namespace, func(i *RawObjectWithHistory) (bool, *RawObjectWithHistory, error) {
match := i.Object.UID == r.UID && i.Object.Kind == r.Kind
if !match {
if !r.GRN.Equals(i.Object.GRN) {
return false, nil, nil
}
@ -174,8 +172,7 @@ func (i *dummyObjectServer) update(ctx context.Context, r *object.WriteObjectReq
modifier := store.UserFromContext(ctx)
updated := &object.RawObject{
UID: r.UID,
Kind: r.Kind,
GRN: r.GRN,
Created: i.Object.Created,
CreatedBy: i.Object.CreatedBy,
Updated: time.Now().Unix(),
@ -218,7 +215,7 @@ func (i *dummyObjectServer) update(ctx context.Context, r *object.WriteObjectReq
}
if updatedCount == 0 && rsp.Object == nil {
return nil, fmt.Errorf("could not find object with uid %s and kind %s", r.UID, r.Kind)
return nil, fmt.Errorf("could not find object: %v", r.GRN)
}
return rsp, nil
@ -227,8 +224,7 @@ func (i *dummyObjectServer) update(ctx context.Context, r *object.WriteObjectReq
func (i *dummyObjectServer) insert(ctx context.Context, r *object.WriteObjectRequest, namespace string) (*object.WriteObjectResponse, error) {
modifier := store.GetUserIDString(store.UserFromContext(ctx))
rawObj := &object.RawObject{
UID: r.UID,
Kind: r.Kind,
GRN: r.GRN,
Updated: time.Now().Unix(),
Created: time.Now().Unix(),
CreatedBy: modifier,
@ -269,12 +265,12 @@ func (i *dummyObjectServer) insert(ctx context.Context, r *object.WriteObjectReq
}
func (i *dummyObjectServer) Write(ctx context.Context, r *object.WriteObjectRequest) (*object.WriteObjectResponse, error) {
namespace := namespaceFromUID(r.UID)
namespace := namespaceFromUID(r.GRN)
obj, err := i.collection.FindFirst(ctx, namespace, func(i *RawObjectWithHistory) (bool, error) {
if i == nil || r == nil {
return false, nil
}
return i.Object.UID == r.UID, nil
return r.GRN.Equals(i.Object.GRN), nil
})
if err != nil {
return nil, err
@ -288,9 +284,8 @@ func (i *dummyObjectServer) Write(ctx context.Context, r *object.WriteObjectRequ
}
func (i *dummyObjectServer) Delete(ctx context.Context, r *object.DeleteObjectRequest) (*object.DeleteObjectResponse, error) {
_, err := i.collection.Delete(ctx, namespaceFromUID(r.UID), func(i *RawObjectWithHistory) (bool, error) {
match := i.Object.UID == r.UID && i.Object.Kind == r.Kind
if match {
_, err := i.collection.Delete(ctx, namespaceFromUID(r.GRN), func(i *RawObjectWithHistory) (bool, error) {
if r.GRN.Equals(i.Object.GRN) {
if r.PreviousVersion != "" && i.Object.Version != r.PreviousVersion {
return false, fmt.Errorf("expected the previous version to be %s, but was %s", r.PreviousVersion, i.Object.Version)
}
@ -311,7 +306,7 @@ func (i *dummyObjectServer) Delete(ctx context.Context, r *object.DeleteObjectRe
}
func (i *dummyObjectServer) History(ctx context.Context, r *object.ObjectHistoryRequest) (*object.ObjectHistoryResponse, error) {
obj, _, err := i.findObject(ctx, r.UID, r.Kind, "")
obj, _, err := i.findObject(ctx, r.GRN, "")
if err != nil {
return nil, err
}
@ -337,9 +332,9 @@ func (i *dummyObjectServer) Search(ctx context.Context, r *object.ObjectSearchRe
}
// TODO more filters
objects, err := i.collection.Find(ctx, namespaceFromUID("TODO"), func(i *RawObjectWithHistory) (bool, error) {
objects, err := i.collection.Find(ctx, namespaceFromUID(&object.GRN{}), func(i *RawObjectWithHistory) (bool, error) {
if len(r.Kind) != 0 {
if _, ok := kindMap[i.Object.Kind]; !ok {
if _, ok := kindMap[i.Object.GRN.Kind]; !ok {
return false, nil
}
}
@ -351,18 +346,17 @@ func (i *dummyObjectServer) Search(ctx context.Context, r *object.ObjectSearchRe
searchResults := make([]*object.ObjectSearchResult, 0)
for _, o := range objects {
builder := i.kinds.GetSummaryBuilder(o.Object.Kind)
builder := i.kinds.GetSummaryBuilder(o.Object.GRN.Kind)
if builder == nil {
continue
}
summary, clean, e2 := builder(ctx, o.Object.UID, o.Object.Body)
summary, clean, e2 := builder(ctx, o.Object.GRN.UID, o.Object.Body)
if e2 != nil {
continue
}
searchResults = append(searchResults, &object.ObjectSearchResult{
UID: o.Object.UID,
Kind: o.Object.Kind,
GRN: o.Object.GRN,
Version: o.Object.Version,
Updated: o.Object.Updated,
UpdatedBy: o.Object.UpdatedBy,

View File

@ -44,6 +44,7 @@ func TestRawObjectWithHistory(t *testing.T) {
raw := &RawObjectWithHistory{
Object: &object.RawObject{
GRN: &object.GRN{UID: "x"},
Version: "A",
Body: body,
},
@ -56,12 +57,31 @@ func TestRawObjectWithHistory(t *testing.T) {
body,
})
b, err := json.Marshal(raw)
b, err := json.MarshalIndent(raw, "", " ")
require.NoError(t, err)
str := string(b)
fmt.Printf("expect: %s", str)
require.JSONEq(t, `{"object":{"UID":"","version":"A","body":{"field":1.23,"hello":"world"}},"history":[{"info":{"version":"B"},"body":"eyJmaWVsZCI6MS4yMywiaGVsbG8iOiJ3b3JsZCJ9"}]}`, str)
//fmt.Printf("expect: %s", str)
require.JSONEq(t, `{
"object": {
"GRN": {
"UID": "x"
},
"version": "A",
"body": {
"field": 1.23,
"hello": "world"
}
},
"history": [
{
"info": {
"version": "B"
},
"body": "eyJmaWVsZCI6MS4yMywiaGVsbG8iOiJ3b3JsZCJ9"
}
]
}`, str)
copy := &ObjectVersionWithBody{}
err = json.Unmarshal(b, copy)

View File

@ -0,0 +1,15 @@
package object
// Check if the two GRNs reference to the same object
// we can not use simple `*x == *b` because of the internal settings
func (x *GRN) Equals(b *GRN) bool {
if b == nil {
return false
}
return x == b || (x.TenantId == b.TenantId &&
x.Scope == b.Scope &&
x.Kind == b.Kind &&
x.UID == b.UID)
}
// TODO: this should interpoerate with the GRN string flavor

View File

@ -77,8 +77,10 @@ func parseRequestParams(req *http.Request) (uid string, kind string, params map[
func (s *httpObjectStore) doGetObject(c *models.ReqContext) response.Response {
uid, kind, params := parseRequestParams(c.Req)
rsp, err := s.store.Read(c.Req.Context(), &ReadObjectRequest{
UID: uid,
Kind: kind,
GRN: &GRN{
UID: uid,
Kind: kind,
},
Version: params["version"], // ?version = XYZ
WithBody: params["body"] != "false", // default to true
WithSummary: params["summary"] == "true", // default to false
@ -110,8 +112,10 @@ func (s *httpObjectStore) doGetObject(c *models.ReqContext) response.Response {
func (s *httpObjectStore) doGetRawObject(c *models.ReqContext) response.Response {
uid, kind, params := parseRequestParams(c.Req)
rsp, err := s.store.Read(c.Req.Context(), &ReadObjectRequest{
UID: uid,
Kind: kind,
GRN: &GRN{
UID: uid,
Kind: kind,
},
Version: params["version"], // ?version = XYZ
WithBody: true,
WithSummary: false,
@ -166,8 +170,10 @@ func (s *httpObjectStore) doWriteObject(c *models.ReqContext) response.Response
}
rsp, err := s.store.Write(c.Req.Context(), &WriteObjectRequest{
UID: uid,
Kind: kind,
GRN: &GRN{
UID: uid,
Kind: kind,
},
Body: b,
Comment: params["comment"],
PreviousVersion: params["previousVersion"],
@ -181,8 +187,10 @@ func (s *httpObjectStore) doWriteObject(c *models.ReqContext) response.Response
func (s *httpObjectStore) doDeleteObject(c *models.ReqContext) response.Response {
uid, kind, params := parseRequestParams(c.Req)
rsp, err := s.store.Delete(c.Req.Context(), &DeleteObjectRequest{
UID: uid,
Kind: kind,
GRN: &GRN{
UID: uid,
Kind: kind,
},
PreviousVersion: params["previousVersion"],
})
if err != nil {
@ -195,8 +203,10 @@ func (s *httpObjectStore) doGetHistory(c *models.ReqContext) response.Response {
uid, kind, params := parseRequestParams(c.Req)
limit := int64(20) // params
rsp, err := s.store.History(c.Req.Context(), &ObjectHistoryRequest{
UID: uid,
Kind: kind,
GRN: &GRN{
UID: uid,
Kind: kind,
},
Limit: limit,
NextPageToken: params["nextPageToken"],
})

View File

@ -46,20 +46,15 @@ func (obj *RawObject) UnmarshalJSON(b []byte) error {
func (codec *rawObjectCodec) IsEmpty(ptr unsafe.Pointer) bool {
f := (*RawObject)(ptr)
return f.UID == "" && f.Body == nil
return f.GRN == nil && f.Body == nil
}
func (codec *rawObjectCodec) Encode(ptr unsafe.Pointer, stream *jsoniter.Stream) {
obj := (*RawObject)(ptr)
stream.WriteObjectStart()
stream.WriteObjectField("UID")
stream.WriteString(obj.UID)
stream.WriteObjectField("GRN")
stream.WriteVal(obj.GRN)
if obj.Kind != "" {
stream.WriteMore()
stream.WriteObjectField("kind")
stream.WriteString(obj.Kind)
}
if obj.Version != "" {
stream.WriteMore()
stream.WriteObjectField("version")
@ -125,10 +120,9 @@ func (codec *rawObjectCodec) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator)
func readRawObject(iter *jsoniter.Iterator, raw *RawObject) {
for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() {
switch l1Field {
case "UID":
raw.UID = iter.ReadString()
case "kind":
raw.Kind = iter.ReadString()
case "GRN":
raw.GRN = &GRN{}
iter.ReadVal(raw.GRN)
case "updated":
raw.Updated = iter.ReadInt64()
case "updatedBy":
@ -211,20 +205,15 @@ func (obj *ObjectSearchResult) MarshalJSON() ([]byte, error) {
func (codec *searchResultCodec) IsEmpty(ptr unsafe.Pointer) bool {
f := (*ObjectSearchResult)(ptr)
return f.UID == "" && f.Body == nil
return f.GRN == nil && f.Body == nil
}
func (codec *searchResultCodec) Encode(ptr unsafe.Pointer, stream *jsoniter.Stream) {
obj := (*ObjectSearchResult)(ptr)
stream.WriteObjectStart()
stream.WriteObjectField("UID")
stream.WriteString(obj.UID)
stream.WriteObjectField("GRN")
stream.WriteVal(obj.GRN)
if obj.Kind != "" {
stream.WriteMore()
stream.WriteObjectField("kind")
stream.WriteString(obj.Kind)
}
if obj.Name != "" {
stream.WriteMore()
stream.WriteObjectField("name")

View File

@ -15,8 +15,10 @@ func TestRawEncoders(t *testing.T) {
require.NoError(t, err)
raw := &RawObject{
UID: "a",
Kind: "b",
GRN: &GRN{
UID: "a",
Kind: "b",
},
Version: "c",
ETag: "d",
Body: body,
@ -26,7 +28,19 @@ func TestRawEncoders(t *testing.T) {
require.NoError(t, err)
str := string(b)
require.JSONEq(t, `{"UID":"a","kind":"b","version":"c","body":{"field":1.23,"hello":"world"},"etag":"d"}`, str)
require.JSONEq(t, `{
"GRN": {
"kind": "b",
"UID": "a"
},
"version": "c",
"body": {
"field": 1.23,
"hello": "world"
},
"etag": "d"
}`, str)
copy := &RawObject{}
err = json.Unmarshal(b, copy)

File diff suppressed because it is too large Load Diff

View File

@ -3,51 +3,65 @@ package object;
option go_package = "./;object";
// The canonical object/document data -- this represents the raw bytes and storage level metadata
message RawObject {
// Unique ID
string UID = 1;
message GRN {
// the tenant/org id
int64 tenant_id = 1;
// mabybe "namespace"? valid values include
// * entity
// * drive
// * service/xyz
string scope = 2;
// Identify the object kind. This kind will be used to apply a schema to the body and
// will trigger additional indexing behavior.
string kind = 2;
string kind = 3;
// Unique ID
string UID = 4;
}
// The canonical object/document data -- this represents the raw bytes and storage level metadata
message RawObject {
// Object identifier
GRN GRN = 1;
// Time in epoch milliseconds that the object was created
int64 created = 3;
int64 created = 2;
// Time in epoch milliseconds that the object was updated
int64 updated = 4;
int64 updated = 3;
// Who created the object
string created_by = 5;
string created_by = 4;
// Who updated the object
string updated_by = 6;
string updated_by = 5;
// Content Length
int64 size = 7;
int64 size = 6;
// MD5 digest of the body
string ETag = 8;
string ETag = 7;
// Raw bytes of the storage object. The kind will determine what is a valid payload
bytes body = 9;
bytes body = 8;
// The version will change when the object is saved. It is not necessarily sortable
//
// NOTE: currently managed by the dashboard+dashboard_version tables
string version = 10;
string version = 9;
// External location info
RawObjectSyncInfo sync = 11;
RawObjectSyncInfo sync = 10;
}
message RawObjectSyncInfo {
// NOTE: currently managed by the dashboard_provisioning table
string source = 11;
string source = 1;
// Time in epoch milliseconds that the object was last synced with an external system (provisioning/git)
int64 time = 12;
int64 time = 2;
}
// Report error while working with objects
@ -91,20 +105,17 @@ message ObjectVersionInfo {
//-----------------------------------------------
message ReadObjectRequest {
// Unique ID (Kind is also required) NOTE: UID+kind will likely be replaced with GRN that encodes both
string UID = 1;
// Object kind (UID is also required) NOTE: UID+kind will likely be replaced with GRN that encodes both
string kind = 2;
// Object identifier
GRN GRN = 1;
// Fetch an explicit version
string version = 3;
string version = 2;
// Include the full body bytes
bool with_body = 4;
bool with_body = 3;
// Include derived summary metadata
bool with_summary = 5;
bool with_summary = 4;
}
message ReadObjectResponse {
@ -120,7 +131,7 @@ message ReadObjectResponse {
//------------------------------------------------------
message BatchReadObjectRequest {
repeated ReadObjectRequest batch = 3;
repeated ReadObjectRequest batch = 1;
}
message BatchReadObjectResponse {
@ -132,20 +143,17 @@ message BatchReadObjectResponse {
//-----------------------------------------------
message WriteObjectRequest {
// Unique ID (Kind is also required) NOTE: UID+kind will likely be replaced with GRN that encodes both
string UID = 1;
// Object kind (UID is also required) NOTE: UID+kind will likely be replaced with GRN that encodes both
string kind = 2;
// Object identifier
GRN GRN = 1;
// The raw object body
bytes body = 3;
bytes body = 2;
// Message that can be seen when exploring object history
string comment = 4;
string comment = 3;
// Used for optimistic locking. If missing, the previous version will be replaced regardless
string previous_version = 6;
string previous_version = 4;
}
message WriteObjectResponse {
@ -175,11 +183,8 @@ message WriteObjectResponse {
//-----------------------------------------------
message DeleteObjectRequest {
// Unique ID (Kind is also required) NOTE: UID+kind will likely be replaced with GRN that encodes both
string UID = 1;
// Object kind (UID is also required) NOTE: UID+kind will likely be replaced with GRN that encodes both
string kind = 2;
// Object identifier
GRN GRN = 1;
// Used for optimistic locking. If missing, the previous version will be replaced regardless
string previous_version = 3;
@ -194,11 +199,8 @@ message DeleteObjectResponse {
//-----------------------------------------------
message ObjectHistoryRequest {
// Unique ID (Kind is also required) NOTE: UID+kind will likely be replaced with GRN that encodes both
string UID = 1;
// Object kind (UID is also required) NOTE: UID+kind will likely be replaced with GRN that encodes both
string kind = 2;
// Object identifier
GRN GRN = 1;
// Maximum number of items to return
int64 limit = 3;
@ -254,12 +256,8 @@ message ObjectSearchRequest {
// Search result metadata for each object
message ObjectSearchResult {
// Unique ID
string UID = 1;
// Identify the object kind. This kind will be used to apply a schema to the body and
// will trigger additional indexing behavior.
string kind = 2;
// Object identifier
GRN GRN = 1;
// The current veresion of this object
string version = 3;

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.21.7
// - protoc v3.21.8
// source: object.proto
package object

View File

@ -1,6 +1,7 @@
package object_server_tests
import (
"context"
"testing"
apikeygenprefix "github.com/grafana/grafana/pkg/components/apikeygenprefixed"
@ -8,6 +9,7 @@ import (
"github.com/grafana/grafana/pkg/services/org"
saAPI "github.com/grafana/grafana/pkg/services/serviceaccounts/api"
saTests "github.com/grafana/grafana/pkg/services/serviceaccounts/tests"
"github.com/grafana/grafana/pkg/services/store"
"github.com/grafana/grafana/pkg/services/store/object"
"github.com/grafana/grafana/pkg/services/user"
"github.com/grafana/grafana/pkg/tests/testinfra"
@ -51,6 +53,7 @@ type testContext struct {
authToken string
client object.ObjectStoreClient
user *user.SignedInUser
ctx context.Context
}
func createTestContext(t *testing.T) testContext {
@ -76,5 +79,6 @@ func createTestContext(t *testing.T) testContext {
authToken: authToken,
client: client,
user: serviceAccountUser,
ctx: store.ContextWithUser(context.Background(), serviceAccountUser),
}
}

View File

@ -1,7 +1,6 @@
package object_server_tests
import (
"context"
"crypto/md5"
"encoding/hex"
"fmt"
@ -20,8 +19,7 @@ func createContentsHash(contents []byte) string {
}
type rawObjectMatcher struct {
uid *string
kind *string
grn *object.GRN
createdRange []time.Time
updatedRange []time.Time
createdBy string
@ -47,12 +45,8 @@ func requireObjectMatch(t *testing.T, obj *object.RawObject, m rawObjectMatcher)
require.NotNil(t, obj)
mismatches := ""
if m.uid != nil && *m.uid != obj.UID {
mismatches += fmt.Sprintf("expected UID: %s, actual UID: %s\n", *m.uid, obj.UID)
}
if m.kind != nil && *m.kind != obj.Kind {
mismatches += fmt.Sprintf("expected kind: %s, actual kind: %s\n", *m.kind, obj.Kind)
if m.grn != nil && !obj.GRN.Equals(m.grn) {
mismatches += fmt.Sprintf("expected: %v, actual: %v\n", m.grn, obj.GRN)
}
if len(m.createdRange) == 2 && !timestampInRange(obj.Created, m.createdRange) {
@ -116,20 +110,22 @@ func TestObjectServer(t *testing.T) {
t.Skip("skipping integration test")
}
ctx := context.Background()
testCtx := createTestContext(t)
ctx = metadata.AppendToOutgoingContext(ctx, "authorization", fmt.Sprintf("Bearer %s", testCtx.authToken))
ctx := metadata.AppendToOutgoingContext(testCtx.ctx, "authorization", fmt.Sprintf("Bearer %s", testCtx.authToken))
fakeUser := fmt.Sprintf("user:%d:%s", testCtx.user.UserID, testCtx.user.Login)
firstVersion := "1"
kind := "dummy"
uid := "my-test-entity"
grn := &object.GRN{
Kind: kind,
UID: "my-test-entity",
Scope: "entity",
}
body := []byte("{\"name\":\"John\"}")
t.Run("should not retrieve non-existent objects", func(t *testing.T) {
resp, err := testCtx.client.Read(ctx, &object.ReadObjectRequest{
UID: uid,
Kind: kind,
GRN: grn,
})
require.NoError(t, err)
@ -140,8 +136,7 @@ func TestObjectServer(t *testing.T) {
t.Run("should be able to read persisted objects", func(t *testing.T) {
before := time.Now()
writeReq := &object.WriteObjectRequest{
UID: uid,
Kind: kind,
GRN: grn,
Body: body,
Comment: "first entity!",
}
@ -157,17 +152,22 @@ func TestObjectServer(t *testing.T) {
requireVersionMatch(t, writeResp.Object, versionMatcher)
readResp, err := testCtx.client.Read(ctx, &object.ReadObjectRequest{
UID: uid,
Kind: kind,
GRN: grn,
Version: "",
WithBody: true,
})
require.NoError(t, err)
require.Nil(t, readResp.SummaryJson)
foundGRN := readResp.Object.GRN
require.NotNil(t, foundGRN)
require.NotEqual(t, testCtx.user.OrgID, foundGRN.TenantId) // orgId becomes the tenant id when not set
require.Equal(t, grn.Scope, foundGRN.Scope)
require.Equal(t, grn.Kind, foundGRN.Kind)
require.Equal(t, grn.UID, foundGRN.UID)
objectMatcher := rawObjectMatcher{
uid: &uid,
kind: &kind,
grn: grn,
createdRange: []time.Time{before, time.Now()},
updatedRange: []time.Time{before, time.Now()},
createdBy: fakeUser,
@ -178,16 +178,14 @@ func TestObjectServer(t *testing.T) {
requireObjectMatch(t, readResp.Object, objectMatcher)
deleteResp, err := testCtx.client.Delete(ctx, &object.DeleteObjectRequest{
UID: uid,
Kind: kind,
GRN: grn,
PreviousVersion: writeResp.Object.Version,
})
require.NoError(t, err)
require.True(t, deleteResp.OK)
readRespAfterDelete, err := testCtx.client.Read(ctx, &object.ReadObjectRequest{
UID: uid,
Kind: kind,
GRN: grn,
Version: "",
WithBody: true,
})
@ -198,8 +196,7 @@ func TestObjectServer(t *testing.T) {
t.Run("should be able to update an object", func(t *testing.T) {
before := time.Now()
writeReq1 := &object.WriteObjectRequest{
UID: uid,
Kind: kind,
GRN: grn,
Body: body,
Comment: "first entity!",
}
@ -210,8 +207,7 @@ func TestObjectServer(t *testing.T) {
body2 := []byte("{\"name\":\"John2\"}")
writeReq2 := &object.WriteObjectRequest{
UID: uid,
Kind: kind,
GRN: grn,
Body: body2,
Comment: "update1",
}
@ -229,8 +225,7 @@ func TestObjectServer(t *testing.T) {
body3 := []byte("{\"name\":\"John3\"}")
writeReq3 := &object.WriteObjectRequest{
UID: uid,
Kind: kind,
GRN: grn,
Body: body3,
Comment: "update3",
}
@ -239,8 +234,7 @@ func TestObjectServer(t *testing.T) {
require.NotEqual(t, writeResp3.Object.Version, writeResp2.Object.Version)
latestMatcher := rawObjectMatcher{
uid: &uid,
kind: &kind,
grn: grn,
createdRange: []time.Time{before, time.Now()},
updatedRange: []time.Time{before, time.Now()},
createdBy: fakeUser,
@ -249,8 +243,7 @@ func TestObjectServer(t *testing.T) {
version: &writeResp3.Object.Version,
}
readRespLatest, err := testCtx.client.Read(ctx, &object.ReadObjectRequest{
UID: uid,
Kind: kind,
GRN: grn,
Version: "", // latest
WithBody: true,
})
@ -259,8 +252,7 @@ func TestObjectServer(t *testing.T) {
requireObjectMatch(t, readRespLatest.Object, latestMatcher)
readRespFirstVer, err := testCtx.client.Read(ctx, &object.ReadObjectRequest{
UID: uid,
Kind: kind,
GRN: grn,
Version: writeResp1.Object.Version,
WithBody: true,
})
@ -269,8 +261,7 @@ func TestObjectServer(t *testing.T) {
require.Nil(t, readRespFirstVer.SummaryJson)
require.NotNil(t, readRespFirstVer.Object)
requireObjectMatch(t, readRespFirstVer.Object, rawObjectMatcher{
uid: &uid,
kind: &kind,
grn: grn,
createdRange: []time.Time{before, time.Now()},
updatedRange: []time.Time{before, time.Now()},
createdBy: fakeUser,
@ -280,8 +271,7 @@ func TestObjectServer(t *testing.T) {
})
history, err := testCtx.client.History(ctx, &object.ObjectHistoryRequest{
UID: uid,
Kind: kind,
GRN: grn,
})
require.NoError(t, err)
require.Equal(t, []*object.ObjectVersionInfo{
@ -291,8 +281,7 @@ func TestObjectServer(t *testing.T) {
}, history.Versions)
deleteResp, err := testCtx.client.Delete(ctx, &object.DeleteObjectRequest{
UID: uid,
Kind: kind,
GRN: grn,
PreviousVersion: writeResp3.Object.Version,
})
require.NoError(t, err)
@ -305,29 +294,34 @@ func TestObjectServer(t *testing.T) {
uid4 := "uid4"
kind2 := "kind2"
w1, err := testCtx.client.Write(ctx, &object.WriteObjectRequest{
UID: uid,
Kind: kind,
GRN: grn,
Body: body,
})
require.NoError(t, err)
w2, err := testCtx.client.Write(ctx, &object.WriteObjectRequest{
UID: uid2,
Kind: kind,
GRN: &object.GRN{
UID: uid2,
Kind: kind,
},
Body: body,
})
require.NoError(t, err)
w3, err := testCtx.client.Write(ctx, &object.WriteObjectRequest{
UID: uid3,
Kind: kind2,
GRN: &object.GRN{
UID: uid3,
Kind: kind2,
},
Body: body,
})
require.NoError(t, err)
w4, err := testCtx.client.Write(ctx, &object.WriteObjectRequest{
UID: uid4,
Kind: kind2,
GRN: &object.GRN{
UID: uid4,
Kind: kind2,
},
Body: body,
})
require.NoError(t, err)
@ -343,8 +337,8 @@ func TestObjectServer(t *testing.T) {
kinds := make([]string, 0, len(search.Results))
version := make([]string, 0, len(search.Results))
for _, res := range search.Results {
uids = append(uids, res.UID)
kinds = append(kinds, res.Kind)
uids = append(uids, res.GRN.UID)
kinds = append(kinds, res.GRN.Kind)
version = append(version, res.Version)
}
require.Equal(t, []string{"my-test-entity", "uid2", "uid3", "uid4"}, uids)
@ -365,8 +359,8 @@ func TestObjectServer(t *testing.T) {
kinds = make([]string, 0, len(searchKind1.Results))
version = make([]string, 0, len(searchKind1.Results))
for _, res := range searchKind1.Results {
uids = append(uids, res.UID)
kinds = append(kinds, res.Kind)
uids = append(uids, res.GRN.UID)
kinds = append(kinds, res.GRN.Kind)
version = append(version, res.Version)
}
require.Equal(t, []string{"my-test-entity", "uid2"}, uids)

View File

@ -0,0 +1,173 @@
package router
import (
"context"
"fmt"
"strconv"
"strings"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/store/kind"
"github.com/grafana/grafana/pkg/services/store/object"
)
type ResourceRouteInfo struct {
// The resource identifier
GRN *object.GRN
// Raw key used in storage engine
Key string
}
type ObjectStoreRouter interface {
// This will throw exceptions for unsupported
Route(ctx context.Context, grn *object.GRN) (ResourceRouteInfo, error)
// Parse a key to get the GRN and storage information
RouteFromKey(ctx context.Context, key string) (ResourceRouteInfo, error)
}
type standardStoreRouter struct {
kinds kind.KindRegistry
}
func NewObjectStoreRouter(kinds kind.KindRegistry) ObjectStoreRouter {
return &standardStoreRouter{kinds: kinds}
}
var _ ObjectStoreRouter = &standardStoreRouter{}
func (r *standardStoreRouter) Route(ctx context.Context, grn *object.GRN) (ResourceRouteInfo, error) {
info := ResourceRouteInfo{
GRN: grn,
}
if grn == nil {
return info, fmt.Errorf("missing GRN")
}
// Make sure the orgID is set
if grn.TenantId < 1 {
return info, fmt.Errorf("missing TenantId")
}
if grn.Kind == "" {
return info, fmt.Errorf("missing Kind")
}
if grn.UID == "" {
return info, fmt.Errorf("missing UID")
}
kind, err := r.kinds.GetInfo(grn.Kind)
if err != nil {
return info, fmt.Errorf("unknown Kind: " + grn.Kind)
}
if grn.Scope == "" {
return info, fmt.Errorf("missing Scope")
}
switch grn.Scope {
case models.ObjectStoreScopeEntity:
{
info.Key = fmt.Sprintf("%d/%s/%s/%s", grn.TenantId, grn.Scope, grn.Kind, grn.UID)
}
case models.ObjectStoreScopeDrive:
{
// Special folder handling in drive
if grn.Kind == models.StandardKindFolder {
info.Key = fmt.Sprintf("%d/%s/%s/__folder.json", grn.TenantId, grn.Scope, grn.UID)
return info, nil
}
if kind.FileExtension != "" {
info.Key = fmt.Sprintf("%d/%s/%s.%s", grn.TenantId, grn.Scope, grn.UID, kind.FileExtension)
} else {
info.Key = fmt.Sprintf("%d/%s/%s-%s.json", grn.TenantId, grn.Scope, grn.UID, grn.Kind)
}
}
default:
return info, fmt.Errorf("unsupported scope: " + grn.Scope)
}
return info, nil
}
func (r *standardStoreRouter) RouteFromKey(ctx context.Context, key string) (ResourceRouteInfo, error) {
info := ResourceRouteInfo{
Key: key,
GRN: &object.GRN{},
}
// {orgID}/{scope}/....
idx := strings.Index(key, "/")
if idx <= 0 {
return info, fmt.Errorf("can not find orgID")
}
p0 := key[:idx]
key = key[idx+1:]
idx = strings.Index(key, "/")
if idx <= 0 {
return info, fmt.Errorf("can not find namespace")
}
p2 := key[:idx]
key = key[idx+1:]
tenantID, err := strconv.ParseInt(p0, 10, 64)
if err != nil {
return info, fmt.Errorf("error parsing orgID")
}
info.GRN.TenantId = tenantID
info.GRN.Scope = p2
switch info.GRN.Scope {
case models.ObjectStoreScopeDrive:
{
idx := strings.LastIndex(key, ".")
if idx > 0 {
ext := key[idx+1:]
if ext == "json" {
sdx := strings.LastIndex(key, "/")
idx = strings.LastIndex(key, "-")
if idx > sdx {
ddx := strings.LastIndex(key, ".") // .json
info.GRN.UID = key[:idx]
info.GRN.Kind = key[idx+1 : ddx]
} else {
switch key[sdx+1:] {
case "__folder.json":
{
info.GRN.UID = key[:sdx]
info.GRN.Kind = models.StandardKindFolder
}
default:
return info, fmt.Errorf("unable to parse drive path")
}
}
} else {
info.GRN.UID = key[:idx]
k, err := r.kinds.GetFromExtension(ext)
if err != nil {
return info, err
}
info.GRN.Kind = k.ID
}
} else {
idx = strings.Index(key, "/")
info.GRN.Kind = key[:idx]
info.GRN.UID = key[idx+1:]
}
}
case models.ObjectStoreScopeEntity:
{
idx = strings.Index(key, "/")
info.GRN.Kind = key[:idx]
info.GRN.UID = key[idx+1:]
}
default:
return info, fmt.Errorf("unsupported scope")
}
return info, nil
}

View File

@ -0,0 +1,95 @@
package router
import (
"context"
"fmt"
"testing"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/store/kind"
"github.com/grafana/grafana/pkg/services/store/object"
"github.com/stretchr/testify/require"
)
func TestSimpleRouter(t *testing.T) {
ctx := context.Background()
router := &standardStoreRouter{
kinds: kind.NewKindRegistry(),
}
info, err := router.Route(ctx, &object.GRN{
UID: "path/to/file",
})
require.Error(t, err) // needs OrgID
type routeScenario struct {
GRN *object.GRN
Error string
Key string
}
scenarios := []routeScenario{{
Error: "missing TenantId",
GRN: &object.GRN{Scope: "x"},
}, {
Error: "unknown Kind: xyz",
GRN: &object.GRN{
TenantId: 11,
Scope: models.ObjectStoreScopeDrive,
UID: "path/to/file",
Kind: "xyz",
},
}, {
Key: "11/drive/path/to/file-dashboard.json",
GRN: &object.GRN{
TenantId: 11,
Scope: models.ObjectStoreScopeDrive,
UID: "path/to/file",
Kind: "dashboard",
},
}, {
Key: "11/drive/path/to/folder/__folder.json",
GRN: &object.GRN{
TenantId: 11,
Scope: models.ObjectStoreScopeDrive,
UID: "path/to/folder",
Kind: "folder",
},
}, {
Key: "10/drive/path/to/file.png",
GRN: &object.GRN{
TenantId: 10,
Scope: models.ObjectStoreScopeDrive,
UID: "path/to/file",
Kind: "png",
},
}, {
Key: "10/entity/playlist/aaaaa", // ?.json better or not?
GRN: &object.GRN{
TenantId: 10,
Scope: models.ObjectStoreScopeEntity,
UID: "aaaaa",
Kind: "playlist",
},
}}
for idx, check := range scenarios {
testID := fmt.Sprintf("[%d] %s", idx, check.Key)
// Read the key from the GRN
info, err = router.Route(ctx, check.GRN)
if check.Error == "" {
require.NoError(t, err, testID)
} else {
require.Error(t, err, testID)
require.Equal(t, check.Error, err.Error(), testID)
continue
}
// Check that the key matched
require.Equal(t, check.Key, info.Key, testID)
// Now try to parse the same key again
out, err := router.RouteFromKey(ctx, info.Key)
require.NoError(t, err, testID)
require.Equal(t, check.GRN, out.GRN, testID)
}
}