Storage: Dummy object server and basic integration tests (#56014)

* object extractors

* update bluge to use summary values

* gosec

* move to store/object package

* references

* references

* references

* same thign but with protobuf

* now the service

* now with summary

* now with summary

* from protobuf

* from protobuf

* cleanup

* remove hand crafted file

* update proto definitions

* update comments

* remove properties

* remove properties

* re-generate

* add batch

* move ref to raw struct

* GRPC test infra

* fix merge

* add delete

* lint

* rename to dummyobjectserver

* update comment

* refactor collection, simplify dummy server

* update

* refactor test structure

* more tests

* more tests

* replace collection with infra/persistentcollection

* skip if not integration test suite

* very important lint fix

Co-authored-by: Ryan McKinley <ryantxu@gmail.com>
This commit is contained in:
Artur Wierzbicki
2022-09-30 23:56:07 +04:00
committed by GitHub
parent 268a49cb38
commit 85b965cbec
5 changed files with 716 additions and 2 deletions

View File

@@ -0,0 +1,307 @@
package objectdummyserver
import (
"context"
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"strconv"
"time"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/x/persistentcollection"
"github.com/grafana/grafana/pkg/services/grpcserver"
"github.com/grafana/grafana/pkg/services/store/object"
"github.com/grafana/grafana/pkg/services/user"
"github.com/grafana/grafana/pkg/setting"
)
type RawObjectWithHistory struct {
*object.RawObject `json:"rawObject,omitempty"`
History []*object.RawObject `json:"history,omitempty"`
}
var (
// increment when RawObject changes
rawObjectVersion = 1
)
func ProvideDummyObjectServer(cfg *setting.Cfg, grpcServerProvider grpcserver.Provider) object.ObjectStoreServer {
objectServer := &dummyObjectServer{
collection: persistentcollection.NewLocalFSPersistentCollection[*RawObjectWithHistory]("raw-object", cfg.DataPath, rawObjectVersion),
log: log.New("in-memory-object-server"),
}
object.RegisterObjectStoreServer(grpcServerProvider.GetServer(), objectServer)
return objectServer
}
type dummyObjectServer struct {
log log.Logger
collection persistentcollection.PersistentCollection[*RawObjectWithHistory]
}
func namespaceFromUID(uid string) string {
// TODO
return "orgId-1"
}
func userFromContext(ctx context.Context) *user.SignedInUser {
// TODO implement in GRPC server
return &user.SignedInUser{
UserID: 1,
OrgID: 1,
Login: "fake",
}
}
func (i dummyObjectServer) findObject(ctx context.Context, uid string, kind string, version string) (*RawObjectWithHistory, *object.RawObject, error) {
if uid == "" {
return nil, nil, errors.New("UID must not be empty")
}
obj, err := i.collection.FindFirst(ctx, namespaceFromUID(uid), func(i *RawObjectWithHistory) (bool, error) {
return i.UID == uid && i.Kind == kind, nil
})
if err != nil {
return nil, nil, err
}
if obj == nil {
return nil, nil, nil
}
getLatestVersion := version == ""
if getLatestVersion {
objVersion := obj.History[len(obj.History)-1]
return obj, objVersion, nil
}
for _, objVersion := range obj.History {
if objVersion.Version == version {
return obj, objVersion, nil
}
}
return obj, nil, nil
}
func (i dummyObjectServer) Read(ctx context.Context, r *object.ReadObjectRequest) (*object.ReadObjectResponse, error) {
_, objVersion, err := i.findObject(ctx, r.UID, r.Kind, r.Version)
if err != nil {
return nil, err
}
if objVersion == nil {
return &object.ReadObjectResponse{
Object: nil,
SummaryJson: nil,
}, nil
}
return &object.ReadObjectResponse{
Object: objVersion,
SummaryJson: nil,
}, nil
}
func (i dummyObjectServer) BatchRead(ctx context.Context, batchR *object.BatchReadObjectRequest) (*object.BatchReadObjectResponse, error) {
results := make([]*object.ReadObjectResponse, 0)
for _, r := range batchR.Batch {
resp, err := i.Read(ctx, r)
if err != nil {
return nil, err
}
results = append(results, resp)
}
return &object.BatchReadObjectResponse{Results: results}, nil
}
func createContentsHash(contents []byte) string {
hash := md5.Sum(contents)
return hex.EncodeToString(hash[:])
}
func (i dummyObjectServer) update(ctx context.Context, r *object.WriteObjectRequest, namespace string) (*object.WriteObjectResponse, error) {
var updated *object.RawObject
updatedCount, err := i.collection.Update(ctx, namespace, func(i *RawObjectWithHistory) (bool, *RawObjectWithHistory, error) {
match := i.UID == r.UID && i.Kind == r.Kind
if !match {
return false, nil, nil
}
if r.PreviousVersion != "" && i.Version != r.PreviousVersion {
return false, nil, fmt.Errorf("expected the previous version to be %s, but was %s", r.PreviousVersion, i.Version)
}
prevVersion, err := strconv.Atoi(i.Version)
if err != nil {
return false, nil, err
}
modifier := userFromContext(ctx)
updated = &object.RawObject{
UID: r.UID,
Kind: r.Kind,
Created: i.Created,
CreatedBy: i.CreatedBy,
Modified: time.Now().Unix(),
ModifiedBy: &object.UserInfo{
Id: modifier.UserID,
Login: modifier.Login,
},
Size: int64(len(r.Body)),
ETag: createContentsHash(r.Body),
Body: r.Body,
Version: fmt.Sprintf("%d", prevVersion+1),
Comment: r.Comment,
}
return true, &RawObjectWithHistory{
RawObject: updated,
History: append(i.History, updated),
}, nil
})
if err != nil {
return nil, err
}
if updatedCount == 0 {
return nil, fmt.Errorf("could not find object with uid %s and kind %s", r.UID, r.Kind)
}
return &object.WriteObjectResponse{
Error: nil,
Object: updated,
}, nil
}
func (i dummyObjectServer) insert(ctx context.Context, r *object.WriteObjectRequest, namespace string) (*object.WriteObjectResponse, error) {
modifier := userFromContext(ctx)
rawObj := &object.RawObject{
UID: r.UID,
Kind: r.Kind,
Modified: time.Now().Unix(),
Created: time.Now().Unix(),
CreatedBy: &object.UserInfo{
Id: modifier.UserID,
Login: modifier.Login,
},
ModifiedBy: &object.UserInfo{
Id: modifier.UserID,
Login: modifier.Login,
},
Size: int64(len(r.Body)),
ETag: createContentsHash(r.Body),
Body: r.Body,
Version: fmt.Sprintf("%d", 1),
Comment: r.Comment,
}
newObj := &RawObjectWithHistory{
RawObject: rawObj,
History: []*object.RawObject{rawObj},
}
err := i.collection.Insert(ctx, namespace, newObj)
if err != nil {
return nil, err
}
return &object.WriteObjectResponse{
Error: nil,
Object: newObj.RawObject,
}, nil
}
func (i dummyObjectServer) Write(ctx context.Context, r *object.WriteObjectRequest) (*object.WriteObjectResponse, error) {
namespace := namespaceFromUID(r.UID)
obj, err := i.collection.FindFirst(ctx, namespace, func(i *RawObjectWithHistory) (bool, error) {
return i.UID == r.UID, nil
})
if err != nil {
return nil, err
}
if obj == nil {
return i.insert(ctx, r, namespace)
}
return i.update(ctx, r, namespace)
}
func (i dummyObjectServer) Delete(ctx context.Context, r *object.DeleteObjectRequest) (*object.DeleteObjectResponse, error) {
_, err := i.collection.Delete(ctx, namespaceFromUID(r.UID), func(i *RawObjectWithHistory) (bool, error) {
match := i.UID == r.UID && i.Kind == r.Kind
if match {
if r.PreviousVersion != "" && i.Version != r.PreviousVersion {
return false, fmt.Errorf("expected the previous version to be %s, but was %s", r.PreviousVersion, i.Version)
}
return true, nil
}
return false, nil
})
if err != nil {
return nil, err
}
return &object.DeleteObjectResponse{
OK: true,
}, nil
}
func (i dummyObjectServer) History(ctx context.Context, r *object.ObjectHistoryRequest) (*object.ObjectHistoryResponse, error) {
obj, _, err := i.findObject(ctx, r.UID, r.Kind, "")
if err != nil {
return nil, err
}
if obj == nil {
return &object.ObjectHistoryResponse{
Object: nil,
}, nil
}
return &object.ObjectHistoryResponse{
Object: obj.History,
}, nil
}
func (i dummyObjectServer) Search(ctx context.Context, r *object.ObjectSearchRequest) (*object.ObjectSearchResponse, error) {
var kindMap map[string]bool
if len(r.Kind) != 0 {
kindMap = make(map[string]bool)
for _, k := range r.Kind {
kindMap[k] = true
}
}
// TODO more filters
objects, err := i.collection.Find(ctx, namespaceFromUID("TODO"), func(i *RawObjectWithHistory) (bool, error) {
if len(r.Kind) != 0 {
if _, ok := kindMap[i.Kind]; !ok {
return false, nil
}
}
return true, nil
})
if err != nil {
return nil, err
}
rawObjects := make([]*object.RawObject, 0)
for _, o := range objects {
rawObjects = append(rawObjects, o.RawObject)
}
return &object.ObjectSearchResponse{
Results: rawObjects,
}, nil
}

View File

@@ -0,0 +1,71 @@
package object_server_tests
import (
"testing"
apikeygenprefix "github.com/grafana/grafana/pkg/components/apikeygenprefixed"
"github.com/grafana/grafana/pkg/server"
"github.com/grafana/grafana/pkg/services/org"
saAPI "github.com/grafana/grafana/pkg/services/serviceaccounts/api"
saTests "github.com/grafana/grafana/pkg/services/serviceaccounts/tests"
"github.com/grafana/grafana/pkg/services/store/object"
"github.com/grafana/grafana/pkg/tests/testinfra"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func createServiceAccountAdminToken(t *testing.T, env *server.TestEnv) string {
t.Helper()
account := saTests.SetupUserServiceAccount(t, env.SQLStore, saTests.TestUser{
Name: "grpc-server-sa",
Role: string(org.RoleAdmin),
Login: "grpc-server-sa",
IsServiceAccount: true,
OrgID: 1,
})
keyGen, err := apikeygenprefix.New(saAPI.ServiceID)
require.NoError(t, err)
_ = saTests.SetupApiKey(t, env.SQLStore, saTests.TestApiKey{
Name: "grpc-server-test",
Role: org.RoleAdmin,
OrgId: account.OrgID,
Key: keyGen.HashedKey,
ServiceAccountID: &account.ID,
})
return keyGen.ClientSecret
}
type testContext struct {
authToken string
client object.ObjectStoreClient
}
func createTestContext(t *testing.T) testContext {
t.Helper()
dir, path := testinfra.CreateGrafDir(t, testinfra.GrafanaOpts{
EnableFeatureToggles: []string{"grpcServer"},
GRPCServerAddress: "127.0.0.1:0", // :0 for choosing the port automatically
})
_, env := testinfra.StartGrafanaEnv(t, dir, path)
authToken := createServiceAccountAdminToken(t, env)
conn, err := grpc.Dial(
env.GRPCServer.GetAddress(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
require.NoError(t, err)
client := object.NewObjectStoreClient(conn)
return testContext{
authToken: authToken,
client: client,
}
}

View File

@@ -0,0 +1,334 @@
package object_server_tests
import (
"context"
"crypto/md5"
"encoding/hex"
"fmt"
"reflect"
"strings"
"testing"
"time"
"github.com/grafana/grafana/pkg/services/store/object"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/metadata"
)
func createContentsHash(contents []byte) string {
hash := md5.Sum(contents)
return hex.EncodeToString(hash[:])
}
type rawObjectMatcher struct {
uid *string
kind *string
createdRange []time.Time
modifiedRange []time.Time
createdBy *object.UserInfo
modifiedBy *object.UserInfo
body []byte
version *string
comment *string
}
func userInfoMatches(expected *object.UserInfo, actual *object.UserInfo) (bool, string) {
var mismatches []string
if expected.Id != actual.Id {
mismatches = append(mismatches, fmt.Sprintf("expected ID %d, actual ID: %d", expected.Id, actual.Id))
}
if expected.Login != actual.Login {
mismatches = append(mismatches, fmt.Sprintf("expected login %s, actual login: %s", expected.Login, actual.Login))
}
return len(mismatches) == 0, strings.Join(mismatches, ", ")
}
func timestampInRange(ts int64, tsRange []time.Time) bool {
return ts >= tsRange[0].Unix() && ts <= tsRange[1].Unix()
}
func requireObjectMatch(t *testing.T, obj *object.RawObject, m rawObjectMatcher) {
t.Helper()
mismatches := ""
if m.uid != nil && *m.uid != obj.UID {
mismatches += fmt.Sprintf("expected UID: %s, actual UID: %s\n", *m.uid, obj.UID)
}
if m.kind != nil && *m.kind != obj.Kind {
mismatches += fmt.Sprintf("expected kind: %s, actual kind: %s\n", *m.kind, obj.Kind)
}
if len(m.createdRange) == 2 && !timestampInRange(obj.Created, m.createdRange) {
mismatches += fmt.Sprintf("expected createdBy range: [from %s to %s], actual created: %s\n", m.createdRange[0], m.createdRange[1], time.Unix(obj.Created, 0))
}
if len(m.modifiedRange) == 2 && !timestampInRange(obj.Modified, m.modifiedRange) {
mismatches += fmt.Sprintf("expected createdBy range: [from %s to %s], actual created: %s\n", m.createdRange[0], m.createdRange[1], time.Unix(obj.Created, 0))
}
if m.createdBy != nil {
userInfoMatches, msg := userInfoMatches(m.createdBy, obj.CreatedBy)
if !userInfoMatches {
mismatches += fmt.Sprintf("createdBy: %s\n", msg)
}
}
if m.modifiedBy != nil {
userInfoMatches, msg := userInfoMatches(m.modifiedBy, obj.ModifiedBy)
if !userInfoMatches {
mismatches += fmt.Sprintf("modifiedBy: %s\n", msg)
}
}
if !reflect.DeepEqual(m.body, obj.Body) {
mismatches += fmt.Sprintf("expected body len: %d, actual body len: %d\n", len(m.body), len(obj.Body))
}
expectedHash := createContentsHash(m.body)
actualHash := createContentsHash(obj.Body)
if expectedHash != actualHash {
mismatches += fmt.Sprintf("expected body hash: %s, actual body hash: %s\n", expectedHash, actualHash)
}
if m.version != nil && *m.version != obj.Version {
mismatches += fmt.Sprintf("expected version: %s, actual version: %s\n", *m.version, obj.Version)
}
if m.comment != nil && *m.comment != obj.Comment {
mismatches += fmt.Sprintf("expected comment: %s, actual comment: %s\n", *m.comment, obj.Comment)
}
require.True(t, len(mismatches) == 0, mismatches)
}
func TestObjectServer(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
ctx := context.Background()
testCtx := createTestContext(t)
ctx = metadata.AppendToOutgoingContext(ctx, "authorization", fmt.Sprintf("Bearer %s", testCtx.authToken))
fakeUser := &object.UserInfo{
Login: "fake",
Id: 1,
}
firstVersion := "1"
kind := "dashboard"
uid := "my-test-entity"
body := []byte("{\"name\":\"John\"}")
t.Run("should not retrieve non-existent objects", func(t *testing.T) {
resp, err := testCtx.client.Read(ctx, &object.ReadObjectRequest{
UID: uid,
Kind: kind,
})
require.NoError(t, err)
require.NotNil(t, resp)
require.Nil(t, resp.Object)
})
t.Run("should be able to read persisted objects", func(t *testing.T) {
before := time.Now()
writeReq := &object.WriteObjectRequest{
UID: uid,
Kind: kind,
Body: body,
Comment: "first entity!",
}
writeResp, err := testCtx.client.Write(ctx, writeReq)
require.NoError(t, err)
objectMatcher := rawObjectMatcher{
uid: &uid,
kind: &kind,
createdRange: []time.Time{before, time.Now()},
modifiedRange: []time.Time{before, time.Now()},
createdBy: fakeUser,
modifiedBy: fakeUser,
body: body,
version: &firstVersion,
comment: &writeReq.Comment,
}
requireObjectMatch(t, writeResp.Object, objectMatcher)
readResp, err := testCtx.client.Read(ctx, &object.ReadObjectRequest{
UID: uid,
Kind: kind,
Version: "",
WithBody: true,
})
require.NoError(t, err)
require.Nil(t, readResp.SummaryJson)
requireObjectMatch(t, writeResp.Object, objectMatcher)
deleteResp, err := testCtx.client.Delete(ctx, &object.DeleteObjectRequest{
UID: uid,
Kind: kind,
PreviousVersion: writeResp.Object.Version,
})
require.NoError(t, err)
require.True(t, deleteResp.OK)
readRespAfterDelete, err := testCtx.client.Read(ctx, &object.ReadObjectRequest{
UID: uid,
Kind: kind,
Version: "",
WithBody: true,
})
require.NoError(t, err)
require.Nil(t, readRespAfterDelete.Object)
})
t.Run("should be able to update an object", func(t *testing.T) {
before := time.Now()
writeReq1 := &object.WriteObjectRequest{
UID: uid,
Kind: kind,
Body: body,
Comment: "first entity!",
}
writeResp1, err := testCtx.client.Write(ctx, writeReq1)
require.NoError(t, err)
body2 := []byte("{\"name\":\"John2\"}")
writeReq2 := &object.WriteObjectRequest{
UID: uid,
Kind: kind,
Body: body2,
Comment: "update1",
}
writeResp2, err := testCtx.client.Write(ctx, writeReq2)
require.NoError(t, err)
require.NotEqual(t, writeResp1.Object.Version, writeResp2.Object.Version)
body3 := []byte("{\"name\":\"John3\"}")
writeReq3 := &object.WriteObjectRequest{
UID: uid,
Kind: kind,
Body: body3,
Comment: "update3",
}
writeResp3, err := testCtx.client.Write(ctx, writeReq3)
require.NoError(t, err)
require.NotEqual(t, writeResp3.Object.Version, writeResp2.Object.Version)
latestMatcher := rawObjectMatcher{
uid: &uid,
kind: &kind,
createdRange: []time.Time{before, time.Now()},
modifiedRange: []time.Time{before, time.Now()},
createdBy: fakeUser,
modifiedBy: fakeUser,
body: body3,
version: &writeResp3.Object.Version,
comment: &writeReq3.Comment,
}
readRespLatest, err := testCtx.client.Read(ctx, &object.ReadObjectRequest{
UID: uid,
Kind: kind,
Version: "", // latest
WithBody: true,
})
require.NoError(t, err)
require.Nil(t, readRespLatest.SummaryJson)
requireObjectMatch(t, readRespLatest.Object, latestMatcher)
readRespFirstVer, err := testCtx.client.Read(ctx, &object.ReadObjectRequest{
UID: uid,
Kind: kind,
Version: writeResp1.Object.Version,
WithBody: true,
})
require.NoError(t, err)
require.Nil(t, readRespFirstVer.SummaryJson)
require.NotNil(t, readRespFirstVer.Object)
requireObjectMatch(t, readRespFirstVer.Object, rawObjectMatcher{
uid: &uid,
kind: &kind,
createdRange: []time.Time{before, time.Now()},
modifiedRange: []time.Time{before, time.Now()},
createdBy: fakeUser,
modifiedBy: fakeUser,
body: body,
version: &firstVersion,
comment: &writeReq1.Comment,
})
history, err := testCtx.client.History(ctx, &object.ObjectHistoryRequest{
UID: uid,
Kind: kind,
})
require.NoError(t, err)
require.Equal(t, []*object.RawObject{
writeResp1.Object,
writeResp2.Object,
writeResp3.Object,
}, history.Object)
deleteResp, err := testCtx.client.Delete(ctx, &object.DeleteObjectRequest{
UID: uid,
Kind: kind,
PreviousVersion: writeResp3.Object.Version,
})
require.NoError(t, err)
require.True(t, deleteResp.OK)
})
t.Run("should be able to search for objects", func(t *testing.T) {
uid2 := "uid2"
uid3 := "uid3"
uid4 := "uid4"
kind2 := "kind2"
w1, err := testCtx.client.Write(ctx, &object.WriteObjectRequest{
UID: uid,
Kind: kind,
Body: body,
})
require.NoError(t, err)
w2, err := testCtx.client.Write(ctx, &object.WriteObjectRequest{
UID: uid2,
Kind: kind,
Body: body,
})
require.NoError(t, err)
w3, err := testCtx.client.Write(ctx, &object.WriteObjectRequest{
UID: uid3,
Kind: kind2,
Body: body,
})
require.NoError(t, err)
w4, err := testCtx.client.Write(ctx, &object.WriteObjectRequest{
UID: uid4,
Kind: kind2,
Body: body,
})
require.NoError(t, err)
search, err := testCtx.client.Search(ctx, &object.ObjectSearchRequest{
Kind: []string{kind, kind2},
})
require.NoError(t, err)
require.Equal(t, []*object.RawObject{
w1.Object, w2.Object, w3.Object, w4.Object,
}, search.Results)
searchKind1, err := testCtx.client.Search(ctx, &object.ObjectSearchRequest{
Kind: []string{kind},
})
require.NoError(t, err)
require.Equal(t, []*object.RawObject{
w1.Object, w2.Object,
}, searchKind1.Results)
})
}