Storage: add an admin write flavor that can explicitly set the user/time (#58618)

This commit is contained in:
Ryan McKinley 2022-11-12 11:36:18 -08:00 committed by GitHub
parent 69b5a9c752
commit 5934407443
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 845 additions and 369 deletions

View File

@ -123,14 +123,21 @@ func (e *objectStoreJob) start(ctx context.Context) {
rowUser.UserID = 0 // avoid Uint64Val issue???? rowUser.UserID = 0 // avoid Uint64Val issue????
} }
_, err = e.store.Write(ctx, &object.WriteObjectRequest{ _, err = e.store.AdminWrite(ctx, &object.AdminWriteObjectRequest{
GRN: &object.GRN{ GRN: &object.GRN{
Scope: models.ObjectStoreScopeEntity, Scope: models.ObjectStoreScopeEntity,
UID: dash.UID, UID: dash.UID,
Kind: models.StandardKindDashboard, Kind: models.StandardKindDashboard,
}, },
Body: dash.Body, ClearHistory: true,
Comment: "export from dashboard table", Version: fmt.Sprintf("%d", dash.Version),
CreatedAt: dash.Created.UnixMilli(),
UpdatedAt: dash.Updated.UnixMilli(),
UpdatedBy: fmt.Sprintf("user:%d", dash.UpdatedBy),
CreatedBy: fmt.Sprintf("user:%d", dash.CreatedBy),
Origin: "export-from-sql",
Body: dash.Data,
Comment: "(exported from SQL)",
}) })
if err != nil { if err != nil {
e.status.Status = "error: " + err.Error() e.status.Status = "error: " + err.Error()
@ -254,34 +261,25 @@ func (e *objectStoreJob) start(ctx context.Context) {
} }
type dashInfo struct { type dashInfo struct {
OrgID int64 OrgID int64 `db:"org_id"`
UID string UID string
Body []byte Version int64
UpdatedBy int64 Slug string
Data []byte
Created time.Time
Updated time.Time
CreatedBy int64 `db:"created_by"`
UpdatedBy int64 `db:"updated_by"`
} }
// TODO, paging etc
func (e *objectStoreJob) getDashboards(ctx context.Context) ([]dashInfo, error) { func (e *objectStoreJob) getDashboards(ctx context.Context) ([]dashInfo, error) {
e.status.Last = "find dashbaords...." e.status.Last = "find dashbaords...."
e.broadcaster(e.status) e.broadcaster(e.status)
dash := make([]dashInfo, 0) dash := make([]dashInfo, 0)
rows, err := e.sess.Query(ctx, "SELECT org_id,uid,data,updated_by FROM dashboard WHERE is_folder=false") err := e.sess.Select(ctx, &dash, "SELECT org_id,uid,version,slug,data,created,updated,created_by,updated_by FROM dashboard WHERE is_folder=false")
if err != nil { return dash, err
return nil, err
}
for rows.Next() {
if e.stopRequested {
return dash, nil
}
row := dashInfo{}
err = rows.Scan(&row.OrgID, &row.UID, &row.Body, &row.UpdatedBy)
if err != nil {
return nil, err
}
dash = append(dash, row)
}
return dash, nil
} }
func (e *objectStoreJob) getStatus() ExportStatus { func (e *objectStoreJob) getStatus() ExportStatus {

View File

@ -46,9 +46,9 @@ func addObjectStorageMigrations(mg *migrator.Migrator) {
{Name: "updated_by", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, {Name: "updated_by", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "created_by", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, {Name: "created_by", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
// For objects that are synchronized from an external source (ie provisioning or git) // Mark objects with origin metadata
{Name: "sync_src", Type: migrator.DB_Text, Nullable: true}, {Name: "origin", Type: migrator.DB_Text, Nullable: true},
{Name: "sync_time", Type: migrator.DB_BigInt, Nullable: true}, {Name: "origin_ts", Type: migrator.DB_BigInt, Nullable: false},
// Summary data (always extracted from the `body` column) // Summary data (always extracted from the `body` column)
{Name: "name", Type: migrator.DB_NVarchar, Length: 255, Nullable: false}, {Name: "name", Type: migrator.DB_NVarchar, Length: 255, Nullable: false},
@ -134,7 +134,7 @@ func addObjectStorageMigrations(mg *migrator.Migrator) {
// Migration cleanups: given that this is a complex setup // Migration cleanups: given that this is a complex setup
// that requires a lot of testing before we are ready to push out of dev // that requires a lot of testing before we are ready to push out of dev
// this script lets us easy wipe previous changes and initialize clean tables // this script lets us easy wipe previous changes and initialize clean tables
suffix := " (v2)" // change this when we want to wipe and reset the object tables suffix := " (v5)" // change this when we want to wipe and reset the object tables
mg.AddMigration("ObjectStore init: cleanup"+suffix, migrator.NewRawSQLMigration(strings.TrimSpace(` mg.AddMigration("ObjectStore init: cleanup"+suffix, migrator.NewRawSQLMigration(strings.TrimSpace(`
DELETE FROM migration_log WHERE migration_id LIKE 'ObjectStore init%'; DELETE FROM migration_log WHERE migration_id LIKE 'ObjectStore init%';
`))) `)))

View File

@ -35,6 +35,10 @@ var (
rawObjectVersion = 9 rawObjectVersion = 9
) )
// Make sure we implement both store + admin
var _ object.ObjectStoreServer = &dummyObjectServer{}
var _ object.ObjectStoreAdminServer = &dummyObjectServer{}
func ProvideDummyObjectServer(cfg *setting.Cfg, grpcServerProvider grpcserver.Provider, kinds kind.KindRegistry) object.ObjectStoreServer { func ProvideDummyObjectServer(cfg *setting.Cfg, grpcServerProvider grpcserver.Provider, kinds kind.KindRegistry) object.ObjectStoreServer {
objectServer := &dummyObjectServer{ objectServer := &dummyObjectServer{
collection: persistentcollection.NewLocalFSPersistentCollection[*RawObjectWithHistory]("raw-object", cfg.DataPath, rawObjectVersion), collection: persistentcollection.NewLocalFSPersistentCollection[*RawObjectWithHistory]("raw-object", cfg.DataPath, rawObjectVersion),
@ -149,7 +153,7 @@ func createContentsHash(contents []byte) string {
return hex.EncodeToString(hash[:]) return hex.EncodeToString(hash[:])
} }
func (i *dummyObjectServer) update(ctx context.Context, r *object.WriteObjectRequest, namespace string) (*object.WriteObjectResponse, error) { func (i *dummyObjectServer) update(ctx context.Context, r *object.AdminWriteObjectRequest, namespace string) (*object.WriteObjectResponse, error) {
builder := i.kinds.GetSummaryBuilder(r.GRN.Kind) builder := i.kinds.GetSummaryBuilder(r.GRN.Kind)
if builder == nil { if builder == nil {
return nil, fmt.Errorf("unsupported kind: " + r.GRN.Kind) return nil, fmt.Errorf("unsupported kind: " + r.GRN.Kind)
@ -222,7 +226,7 @@ func (i *dummyObjectServer) update(ctx context.Context, r *object.WriteObjectReq
return rsp, nil return rsp, nil
} }
func (i *dummyObjectServer) insert(ctx context.Context, r *object.WriteObjectRequest, namespace string) (*object.WriteObjectResponse, error) { func (i *dummyObjectServer) insert(ctx context.Context, r *object.AdminWriteObjectRequest, namespace string) (*object.WriteObjectResponse, error) {
modifier := store.GetUserIDString(store.UserFromContext(ctx)) modifier := store.GetUserIDString(store.UserFromContext(ctx))
rawObj := &object.RawObject{ rawObj := &object.RawObject{
GRN: r.GRN, GRN: r.GRN,
@ -266,6 +270,15 @@ func (i *dummyObjectServer) insert(ctx context.Context, r *object.WriteObjectReq
} }
func (i *dummyObjectServer) Write(ctx context.Context, r *object.WriteObjectRequest) (*object.WriteObjectResponse, error) { func (i *dummyObjectServer) Write(ctx context.Context, r *object.WriteObjectRequest) (*object.WriteObjectResponse, error) {
return i.doWrite(ctx, object.ToAdminWriteObjectRequest(r))
}
func (i *dummyObjectServer) AdminWrite(ctx context.Context, r *object.AdminWriteObjectRequest) (*object.WriteObjectResponse, error) {
// Check permissions?
return i.doWrite(ctx, r)
}
func (i *dummyObjectServer) doWrite(ctx context.Context, r *object.AdminWriteObjectRequest) (*object.WriteObjectResponse, error) {
grn := getFullGRN(ctx, r.GRN) grn := getFullGRN(ctx, r.GRN)
namespace := namespaceFromUID(grn) namespace := namespaceFromUID(grn)
obj, err := i.collection.FindFirst(ctx, namespace, func(i *RawObjectWithHistory) (bool, error) { obj, err := i.collection.FindFirst(ctx, namespace, func(i *RawObjectWithHistory) (bool, error) {

View File

@ -7,12 +7,20 @@ import (
"github.com/grafana/grafana/pkg/services/store/object" "github.com/grafana/grafana/pkg/services/store/object"
) )
// Make sure we implement both store + admin
var _ object.ObjectStoreServer = &fakeObjectStore{}
var _ object.ObjectStoreAdminServer = &fakeObjectStore{}
func ProvideFakeObjectServer() object.ObjectStoreServer { func ProvideFakeObjectServer() object.ObjectStoreServer {
return &fakeObjectStore{} return &fakeObjectStore{}
} }
type fakeObjectStore struct{} type fakeObjectStore struct{}
func (i fakeObjectStore) AdminWrite(ctx context.Context, r *object.AdminWriteObjectRequest) (*object.WriteObjectResponse, error) {
return nil, fmt.Errorf("unimplemented")
}
func (i fakeObjectStore) Write(ctx context.Context, r *object.WriteObjectRequest) (*object.WriteObjectResponse, error) { func (i fakeObjectStore) Write(ctx context.Context, r *object.WriteObjectRequest) (*object.WriteObjectResponse, error) {
return nil, fmt.Errorf("unimplemented") return nil, fmt.Errorf("unimplemented")
} }

View File

@ -102,10 +102,10 @@ func (codec *rawObjectCodec) Encode(ptr unsafe.Pointer, stream *jsoniter.Stream)
stream.WriteInt64(obj.Size) stream.WriteInt64(obj.Size)
} }
if obj.Sync != nil { if obj.Origin != nil {
stream.WriteMore() stream.WriteMore()
stream.WriteObjectField("sync") stream.WriteObjectField("origin")
stream.WriteVal(obj.Sync) stream.WriteVal(obj.Origin)
} }
stream.WriteObjectEnd() stream.WriteObjectEnd()
@ -137,9 +137,9 @@ func readRawObject(iter *jsoniter.Iterator, raw *RawObject) {
raw.ETag = iter.ReadString() raw.ETag = iter.ReadString()
case "version": case "version":
raw.Version = iter.ReadString() raw.Version = iter.ReadString()
case "sync": case "origin":
raw.Sync = &RawObjectSyncInfo{} raw.Origin = &ObjectOriginInfo{}
iter.ReadVal(raw.Sync) iter.ReadVal(raw.Origin)
case "body": case "body":
var val interface{} var val interface{}

File diff suppressed because it is too large Load Diff

View File

@ -53,10 +53,10 @@ message RawObject {
string version = 9; string version = 9;
// External location info // External location info
RawObjectSyncInfo sync = 10; ObjectOriginInfo origin = 10;
} }
message RawObjectSyncInfo { message ObjectOriginInfo {
// NOTE: currently managed by the dashboard_provisioning table // NOTE: currently managed by the dashboard_provisioning table
string source = 1; string source = 1;
@ -156,6 +156,51 @@ message WriteObjectRequest {
string previous_version = 4; string previous_version = 4;
} }
// This operation is useful when syncing a resource from external sources
// that have more accurate metadata information (git, or an archive).
// This process can bypass the forced checks that
message AdminWriteObjectRequest {
// Object identifier
GRN GRN = 1;
// The raw object body
bytes body = 2;
// Message that can be seen when exploring object history
string comment = 3;
// Time in epoch milliseconds that the object was created
// Optional, if 0 it will use the current time
int64 created_at = 4;
// Time in epoch milliseconds that the object was updated
// Optional, if empty it will use the current user
int64 updated_at = 5;
// Who created the object
// Optional, if 0 it will use the current time
string created_by = 6;
// Who updated the object
// Optional, if empty it will use the current user
string updated_by = 7;
// An explicit version identifier
// Optional, if set, this will overwrite/define an explicit version
string version = 8;
// Used for optimistic locking. If missing, the previous version will be replaced regardless
// This may not be used along with an explicit version in the request
string previous_version = 9;
// Request that all previous versions are removed from the history
// This will make sense for systems that manage history explicitly externallay
bool clear_history = 10;
// Optionally define where the object came from
string origin = 11;
}
message WriteObjectResponse { message WriteObjectResponse {
// Error info -- if exists, the save did not happen // Error info -- if exists, the save did not happen
ObjectErrorInfo error = 1; ObjectErrorInfo error = 1;
@ -312,8 +357,7 @@ message ObjectSearchResponse {
// Storage interface // Storage interface
//----------------------------------------------- //-----------------------------------------------
// This assumes a future grpc interface where the user info is passed in context, not in each message body // The object store provides a basic CRUD (+watch eventually) interface for generic objects
// for now it will only work with an admin API key
service ObjectStore { service ObjectStore {
rpc Read(ReadObjectRequest) returns (ReadObjectResponse); rpc Read(ReadObjectRequest) returns (ReadObjectResponse);
rpc BatchRead(BatchReadObjectRequest) returns (BatchReadObjectResponse); rpc BatchRead(BatchReadObjectRequest) returns (BatchReadObjectResponse);
@ -325,4 +369,13 @@ service ObjectStore {
// Ideally an additional search endpoint with more flexibility to limit what you actually care about // Ideally an additional search endpoint with more flexibility to limit what you actually care about
// https://github.com/grafana/grafana-plugin-sdk-go/blob/main/proto/backend.proto#L129 // https://github.com/grafana/grafana-plugin-sdk-go/blob/main/proto/backend.proto#L129
// rpc SearchEX(ObjectSearchRequest) returns (DataResponse); // rpc SearchEX(ObjectSearchRequest) returns (DataResponse);
// TEMPORARY... while we split this into a new service (see below)
rpc AdminWrite(AdminWriteObjectRequest) returns (WriteObjectResponse);
}
// The admin service extends the basic object store interface, but provides
// more explicit control that can support bulk operations like efficient git sync
service ObjectStoreAdmin {
rpc AdminWrite(AdminWriteObjectRequest) returns (WriteObjectResponse);
} }

View File

@ -28,6 +28,8 @@ type ObjectStoreClient interface {
Delete(ctx context.Context, in *DeleteObjectRequest, opts ...grpc.CallOption) (*DeleteObjectResponse, error) Delete(ctx context.Context, in *DeleteObjectRequest, opts ...grpc.CallOption) (*DeleteObjectResponse, error)
History(ctx context.Context, in *ObjectHistoryRequest, opts ...grpc.CallOption) (*ObjectHistoryResponse, error) History(ctx context.Context, in *ObjectHistoryRequest, opts ...grpc.CallOption) (*ObjectHistoryResponse, error)
Search(ctx context.Context, in *ObjectSearchRequest, opts ...grpc.CallOption) (*ObjectSearchResponse, error) Search(ctx context.Context, in *ObjectSearchRequest, opts ...grpc.CallOption) (*ObjectSearchResponse, error)
// TEMPORARY... while we split this into a new service (see below)
AdminWrite(ctx context.Context, in *AdminWriteObjectRequest, opts ...grpc.CallOption) (*WriteObjectResponse, error)
} }
type objectStoreClient struct { type objectStoreClient struct {
@ -92,6 +94,15 @@ func (c *objectStoreClient) Search(ctx context.Context, in *ObjectSearchRequest,
return out, nil return out, nil
} }
func (c *objectStoreClient) AdminWrite(ctx context.Context, in *AdminWriteObjectRequest, opts ...grpc.CallOption) (*WriteObjectResponse, error) {
out := new(WriteObjectResponse)
err := c.cc.Invoke(ctx, "/object.ObjectStore/AdminWrite", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// ObjectStoreServer is the server API for ObjectStore service. // ObjectStoreServer is the server API for ObjectStore service.
// All implementations should embed UnimplementedObjectStoreServer // All implementations should embed UnimplementedObjectStoreServer
// for forward compatibility // for forward compatibility
@ -102,6 +113,8 @@ type ObjectStoreServer interface {
Delete(context.Context, *DeleteObjectRequest) (*DeleteObjectResponse, error) Delete(context.Context, *DeleteObjectRequest) (*DeleteObjectResponse, error)
History(context.Context, *ObjectHistoryRequest) (*ObjectHistoryResponse, error) History(context.Context, *ObjectHistoryRequest) (*ObjectHistoryResponse, error)
Search(context.Context, *ObjectSearchRequest) (*ObjectSearchResponse, error) Search(context.Context, *ObjectSearchRequest) (*ObjectSearchResponse, error)
// TEMPORARY... while we split this into a new service (see below)
AdminWrite(context.Context, *AdminWriteObjectRequest) (*WriteObjectResponse, error)
} }
// UnimplementedObjectStoreServer should be embedded to have forward compatible implementations. // UnimplementedObjectStoreServer should be embedded to have forward compatible implementations.
@ -126,6 +139,9 @@ func (UnimplementedObjectStoreServer) History(context.Context, *ObjectHistoryReq
func (UnimplementedObjectStoreServer) Search(context.Context, *ObjectSearchRequest) (*ObjectSearchResponse, error) { func (UnimplementedObjectStoreServer) Search(context.Context, *ObjectSearchRequest) (*ObjectSearchResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Search not implemented") return nil, status.Errorf(codes.Unimplemented, "method Search not implemented")
} }
func (UnimplementedObjectStoreServer) AdminWrite(context.Context, *AdminWriteObjectRequest) (*WriteObjectResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method AdminWrite not implemented")
}
// UnsafeObjectStoreServer may be embedded to opt out of forward compatibility for this service. // UnsafeObjectStoreServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to ObjectStoreServer will // Use of this interface is not recommended, as added methods to ObjectStoreServer will
@ -246,6 +262,24 @@ func _ObjectStore_Search_Handler(srv interface{}, ctx context.Context, dec func(
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _ObjectStore_AdminWrite_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AdminWriteObjectRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ObjectStoreServer).AdminWrite(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/object.ObjectStore/AdminWrite",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ObjectStoreServer).AdminWrite(ctx, req.(*AdminWriteObjectRequest))
}
return interceptor(ctx, in, info, handler)
}
// ObjectStore_ServiceDesc is the grpc.ServiceDesc for ObjectStore service. // ObjectStore_ServiceDesc is the grpc.ServiceDesc for ObjectStore service.
// It's only intended for direct use with grpc.RegisterService, // It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy) // and not to be introspected or modified (even as a copy)
@ -277,6 +311,94 @@ var ObjectStore_ServiceDesc = grpc.ServiceDesc{
MethodName: "Search", MethodName: "Search",
Handler: _ObjectStore_Search_Handler, Handler: _ObjectStore_Search_Handler,
}, },
{
MethodName: "AdminWrite",
Handler: _ObjectStore_AdminWrite_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "object.proto",
}
// ObjectStoreAdminClient is the client API for ObjectStoreAdmin service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type ObjectStoreAdminClient interface {
AdminWrite(ctx context.Context, in *AdminWriteObjectRequest, opts ...grpc.CallOption) (*WriteObjectResponse, error)
}
type objectStoreAdminClient struct {
cc grpc.ClientConnInterface
}
func NewObjectStoreAdminClient(cc grpc.ClientConnInterface) ObjectStoreAdminClient {
return &objectStoreAdminClient{cc}
}
func (c *objectStoreAdminClient) AdminWrite(ctx context.Context, in *AdminWriteObjectRequest, opts ...grpc.CallOption) (*WriteObjectResponse, error) {
out := new(WriteObjectResponse)
err := c.cc.Invoke(ctx, "/object.ObjectStoreAdmin/AdminWrite", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// ObjectStoreAdminServer is the server API for ObjectStoreAdmin service.
// All implementations should embed UnimplementedObjectStoreAdminServer
// for forward compatibility
type ObjectStoreAdminServer interface {
AdminWrite(context.Context, *AdminWriteObjectRequest) (*WriteObjectResponse, error)
}
// UnimplementedObjectStoreAdminServer should be embedded to have forward compatible implementations.
type UnimplementedObjectStoreAdminServer struct {
}
func (UnimplementedObjectStoreAdminServer) AdminWrite(context.Context, *AdminWriteObjectRequest) (*WriteObjectResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method AdminWrite not implemented")
}
// UnsafeObjectStoreAdminServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to ObjectStoreAdminServer will
// result in compilation errors.
type UnsafeObjectStoreAdminServer interface {
mustEmbedUnimplementedObjectStoreAdminServer()
}
func RegisterObjectStoreAdminServer(s grpc.ServiceRegistrar, srv ObjectStoreAdminServer) {
s.RegisterService(&ObjectStoreAdmin_ServiceDesc, srv)
}
func _ObjectStoreAdmin_AdminWrite_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AdminWriteObjectRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ObjectStoreAdminServer).AdminWrite(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/object.ObjectStoreAdmin/AdminWrite",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ObjectStoreAdminServer).AdminWrite(ctx, req.(*AdminWriteObjectRequest))
}
return interceptor(ctx, in, info, handler)
}
// ObjectStoreAdmin_ServiceDesc is the grpc.ServiceDesc for ObjectStoreAdmin service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var ObjectStoreAdmin_ServiceDesc = grpc.ServiceDesc{
ServiceName: "object.ObjectStoreAdmin",
HandlerType: (*ObjectStoreAdminServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "AdminWrite",
Handler: _ObjectStoreAdmin_AdminWrite_Handler,
},
}, },
Streams: []grpc.StreamDesc{}, Streams: []grpc.StreamDesc{},
Metadata: "object.proto", Metadata: "object.proto",

View File

@ -23,6 +23,10 @@ import (
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
) )
// Make sure we implement both store + admin
var _ object.ObjectStoreServer = &sqlObjectServer{}
var _ object.ObjectStoreAdminServer = &sqlObjectServer{}
func ProvideSQLObjectServer(db db.DB, cfg *setting.Cfg, grpcServerProvider grpcserver.Provider, kinds kind.KindRegistry, resolver resolver.ObjectReferenceResolver) object.ObjectStoreServer { func ProvideSQLObjectServer(db db.DB, cfg *setting.Cfg, grpcServerProvider grpcserver.Provider, kinds kind.KindRegistry, resolver resolver.ObjectReferenceResolver) object.ObjectStoreServer {
objectServer := &sqlObjectServer{ objectServer := &sqlObjectServer{
sess: db.GetSqlxSession(), sess: db.GetSqlxSession(),
@ -49,7 +53,7 @@ func getReadSelect(r *object.ReadObjectRequest) string {
"size", "etag", "errors", // errors are always returned "size", "etag", "errors", // errors are always returned
"created_at", "created_by", "created_at", "created_by",
"updated_at", "updated_by", "updated_at", "updated_by",
"sync_src", "sync_time"} "origin", "origin_ts"}
if r.WithBody { if r.WithBody {
fields = append(fields, `body`) fields = append(fields, `body`)
@ -62,8 +66,8 @@ func getReadSelect(r *object.ReadObjectRequest) string {
func (s *sqlObjectServer) rowToReadObjectResponse(ctx context.Context, rows *sql.Rows, r *object.ReadObjectRequest) (*object.ReadObjectResponse, error) { func (s *sqlObjectServer) rowToReadObjectResponse(ctx context.Context, rows *sql.Rows, r *object.ReadObjectRequest) (*object.ReadObjectResponse, error) {
path := "" // string (extract UID?) path := "" // string (extract UID?)
var syncSrc sql.NullString var origin sql.NullString
var syncTime sql.NullTime originTime := int64(0)
raw := &object.RawObject{ raw := &object.RawObject{
GRN: &object.GRN{}, GRN: &object.GRN{},
} }
@ -74,7 +78,7 @@ func (s *sqlObjectServer) rowToReadObjectResponse(ctx context.Context, rows *sql
&raw.Size, &raw.ETag, &summaryjson.errors, &raw.Size, &raw.ETag, &summaryjson.errors,
&raw.CreatedAt, &raw.CreatedBy, &raw.CreatedAt, &raw.CreatedBy,
&raw.UpdatedAt, &raw.UpdatedBy, &raw.UpdatedAt, &raw.UpdatedBy,
&syncSrc, &syncTime, &origin, &originTime,
} }
if r.WithBody { if r.WithBody {
args = append(args, &raw.Body) args = append(args, &raw.Body)
@ -88,10 +92,10 @@ func (s *sqlObjectServer) rowToReadObjectResponse(ctx context.Context, rows *sql
return nil, err return nil, err
} }
if syncSrc.Valid || syncTime.Valid { if origin.Valid {
raw.Sync = &object.RawObjectSyncInfo{ raw.Origin = &object.ObjectOriginInfo{
Source: syncSrc.String, Source: origin.String,
Time: syncTime.Time.UnixMilli(), Time: originTime,
} }
} }
@ -273,6 +277,11 @@ func (s *sqlObjectServer) BatchRead(ctx context.Context, b *object.BatchReadObje
} }
func (s *sqlObjectServer) Write(ctx context.Context, r *object.WriteObjectRequest) (*object.WriteObjectResponse, error) { func (s *sqlObjectServer) Write(ctx context.Context, r *object.WriteObjectRequest) (*object.WriteObjectResponse, error) {
return s.AdminWrite(ctx, object.ToAdminWriteObjectRequest(r))
}
//nolint:gocyclo
func (s *sqlObjectServer) AdminWrite(ctx context.Context, r *object.AdminWriteObjectRequest) (*object.WriteObjectResponse, error) {
route, err := s.getObjectKey(ctx, r.GRN) route, err := s.getObjectKey(ctx, r.GRN)
if err != nil { if err != nil {
return nil, err return nil, err
@ -282,9 +291,20 @@ func (s *sqlObjectServer) Write(ctx context.Context, r *object.WriteObjectReques
return nil, fmt.Errorf("invalid grn") return nil, fmt.Errorf("invalid grn")
} }
modifier := store.UserFromContext(ctx) timestamp := time.Now().UnixMilli()
if modifier == nil { createdAt := r.CreatedAt
return nil, fmt.Errorf("can not find user in context") createdBy := r.CreatedBy
updatedAt := r.UpdatedAt
updatedBy := r.UpdatedBy
if updatedBy == "" {
modifier := store.UserFromContext(ctx)
if modifier == nil {
return nil, fmt.Errorf("can not find user in context")
}
updatedBy = store.GetUserIDString(modifier)
}
if updatedAt < 1000 {
updatedAt = timestamp
} }
summary, body, err := s.prepare(ctx, r) summary, body, err := s.prepare(ctx, r)
@ -309,10 +329,26 @@ func (s *sqlObjectServer) Write(ctx context.Context, r *object.WriteObjectReques
} }
err = s.sess.WithTransaction(ctx, func(tx *session.SessionTx) error { err = s.sess.WithTransaction(ctx, func(tx *session.SessionTx) error {
var versionInfo *object.ObjectVersionInfo
isUpdate := false isUpdate := false
versionInfo, err := s.selectForUpdate(ctx, tx, path) if r.ClearHistory {
if err != nil { // Optionally keep the original creation time information
return err if createdAt < 1000 || createdBy == "" {
err = s.fillCreationInfo(ctx, tx, path, &createdAt, &createdBy)
if err != nil {
return err
}
}
_, err = doDelete(ctx, tx, path)
if err != nil {
return err
}
versionInfo = &object.ObjectVersionInfo{}
} else {
versionInfo, err = s.selectForUpdate(ctx, tx, path)
if err != nil {
return err
}
} }
// Same object // Same object
@ -330,18 +366,21 @@ func (s *sqlObjectServer) Write(ctx context.Context, r *object.WriteObjectReques
} }
// Set the comment on this write // Set the comment on this write
timestamp := time.Now().UnixMilli()
versionInfo.Comment = r.Comment versionInfo.Comment = r.Comment
if versionInfo.Version == "" { if r.Version == "" {
versionInfo.Version = "1" if versionInfo.Version == "" {
} else { versionInfo.Version = "1"
// Increment the version } else {
i, _ := strconv.ParseInt(versionInfo.Version, 0, 64) // Increment the version
if i < 1 { i, _ := strconv.ParseInt(versionInfo.Version, 0, 64)
i = timestamp if i < 1 {
i = timestamp
}
versionInfo.Version = fmt.Sprintf("%d", i+1)
isUpdate = true
} }
versionInfo.Version = fmt.Sprintf("%d", i+1) } else {
isUpdate = true versionInfo.Version = r.Version
} }
if isUpdate { if isUpdate {
@ -357,8 +396,8 @@ func (s *sqlObjectServer) Write(ctx context.Context, r *object.WriteObjectReques
// 1. Add the `object_history` values // 1. Add the `object_history` values
versionInfo.Size = int64(len(body)) versionInfo.Size = int64(len(body))
versionInfo.ETag = etag versionInfo.ETag = etag
versionInfo.UpdatedAt = timestamp versionInfo.UpdatedAt = updatedAt
versionInfo.UpdatedBy = store.GetUserIDString(modifier) versionInfo.UpdatedBy = updatedBy
_, err = tx.Exec(ctx, `INSERT INTO object_history (`+ _, err = tx.Exec(ctx, `INSERT INTO object_history (`+
"path, version, message, "+ "path, version, message, "+
"size, body, etag, "+ "size, body, etag, "+
@ -366,7 +405,7 @@ func (s *sqlObjectServer) Write(ctx context.Context, r *object.WriteObjectReques
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)", "VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
path, versionInfo.Version, versionInfo.Comment, path, versionInfo.Version, versionInfo.Comment,
versionInfo.Size, body, versionInfo.ETag, versionInfo.Size, body, versionInfo.ETag,
timestamp, versionInfo.UpdatedBy, updatedAt, versionInfo.UpdatedBy,
) )
if err != nil { if err != nil {
return err return err
@ -411,27 +450,35 @@ func (s *sqlObjectServer) Write(ctx context.Context, r *object.WriteObjectReques
"body=?, size=?, etag=?, version=?, "+ "body=?, size=?, etag=?, version=?, "+
"updated_at=?, updated_by=?,"+ "updated_at=?, updated_by=?,"+
"name=?, description=?,"+ "name=?, description=?,"+
"labels=?, fields=?, errors=? "+ "labels=?, fields=?, errors=?, "+
"origin=?, origin_ts=? "+
"WHERE path=?", "WHERE path=?",
body, versionInfo.Size, etag, versionInfo.Version, body, versionInfo.Size, etag, versionInfo.Version,
timestamp, versionInfo.UpdatedBy, updatedAt, versionInfo.UpdatedBy,
summary.model.Name, summary.model.Description, summary.model.Name, summary.model.Description,
summary.labels, summary.fields, summary.errors, summary.labels, summary.fields, summary.errors,
r.Origin, timestamp,
path, path,
) )
return err return err
} }
// Insert the new row if createdAt < 1000 {
createdAt = updatedAt
}
if createdBy == "" {
createdBy = updatedBy
}
_, err = tx.Exec(ctx, "INSERT INTO object ("+ _, err = tx.Exec(ctx, "INSERT INTO object ("+
"path, parent_folder_path, kind, size, body, etag, version,"+ "path, parent_folder_path, kind, size, body, etag, version, "+
"updated_at, updated_by, created_at, created_by,"+ "updated_at, updated_by, created_at, created_by, "+
"name, description,"+ "name, description, origin, origin_ts, "+
"labels, fields, errors) "+ "labels, fields, errors) "+
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
path, getParentFolderPath(grn.Kind, path), grn.Kind, versionInfo.Size, body, etag, versionInfo.Version, path, getParentFolderPath(grn.Kind, path), grn.Kind, versionInfo.Size, body, etag, versionInfo.Version,
timestamp, versionInfo.UpdatedBy, timestamp, versionInfo.UpdatedBy, // created + updated are the same updatedAt, createdBy, createdAt, createdBy, // created + updated are the same
summary.model.Name, summary.model.Description, summary.model.Name, summary.model.Description, r.Origin, timestamp,
summary.labels, summary.fields, summary.errors, summary.labels, summary.fields, summary.errors,
) )
return err return err
@ -443,6 +490,28 @@ func (s *sqlObjectServer) Write(ctx context.Context, r *object.WriteObjectReques
return rsp, err return rsp, err
} }
func (s *sqlObjectServer) fillCreationInfo(ctx context.Context, tx *session.SessionTx, path string, createdAt *int64, createdBy *string) error {
if *createdAt > 1000 {
ignore := int64(0)
createdAt = &ignore
}
if *createdBy == "" {
ignore := ""
createdBy = &ignore
}
rows, err := tx.Query(ctx, "SELECT created_at,created_by FROM object WHERE path=?", path)
if err == nil {
if rows.Next() {
err = rows.Scan(&createdAt, &createdBy)
}
if err == nil {
err = rows.Close()
}
}
return err
}
func (s *sqlObjectServer) selectForUpdate(ctx context.Context, tx *session.SessionTx, path string) (*object.ObjectVersionInfo, error) { func (s *sqlObjectServer) selectForUpdate(ctx context.Context, tx *session.SessionTx, path string) (*object.ObjectVersionInfo, error) {
q := "SELECT etag,version,updated_at,size FROM object WHERE path=?" q := "SELECT etag,version,updated_at,size FROM object WHERE path=?"
if false { // TODO, MYSQL/PosgreSQL can lock the row " FOR UPDATE" if false { // TODO, MYSQL/PosgreSQL can lock the row " FOR UPDATE"
@ -462,7 +531,7 @@ func (s *sqlObjectServer) selectForUpdate(ctx context.Context, tx *session.Sessi
return current, err return current, err
} }
func (s *sqlObjectServer) prepare(ctx context.Context, r *object.WriteObjectRequest) (*summarySupport, []byte, error) { func (s *sqlObjectServer) prepare(ctx context.Context, r *object.AdminWriteObjectRequest) (*summarySupport, []byte, error) {
grn := r.GRN grn := r.GRN
builder := s.kinds.GetSummaryBuilder(grn.Kind) builder := s.kinds.GetSummaryBuilder(grn.Kind)
if builder == nil { if builder == nil {
@ -490,27 +559,29 @@ func (s *sqlObjectServer) Delete(ctx context.Context, r *object.DeleteObjectRequ
rsp := &object.DeleteObjectResponse{} rsp := &object.DeleteObjectResponse{}
err = s.sess.WithTransaction(ctx, func(tx *session.SessionTx) error { err = s.sess.WithTransaction(ctx, func(tx *session.SessionTx) error {
results, err := tx.Exec(ctx, "DELETE FROM object WHERE path=?", path) rsp.OK, err = doDelete(ctx, tx, path)
if err != nil { return err
return err
}
rows, err := results.RowsAffected()
if err != nil {
return err
}
if rows > 0 {
rsp.OK = true
}
// TODO: keep history? would need current version bump, and the "write" would have to get from history
_, _ = tx.Exec(ctx, "DELETE FROM object_history WHERE path=?", path)
_, _ = tx.Exec(ctx, "DELETE FROM object_labels WHERE path=?", path)
_, _ = tx.Exec(ctx, "DELETE FROM object_ref WHERE path=?", path)
return nil
}) })
return rsp, err return rsp, err
} }
func doDelete(ctx context.Context, tx *session.SessionTx, path string) (bool, error) {
results, err := tx.Exec(ctx, "DELETE FROM object WHERE path=?", path)
if err != nil {
return false, err
}
rows, err := results.RowsAffected()
if err != nil {
return false, err
}
// TODO: keep history? would need current version bump, and the "write" would have to get from history
_, _ = tx.Exec(ctx, "DELETE FROM object_history WHERE path=?", path)
_, _ = tx.Exec(ctx, "DELETE FROM object_labels WHERE path=?", path)
_, _ = tx.Exec(ctx, "DELETE FROM object_ref WHERE path=?", path)
return rows > 0, err
}
func (s *sqlObjectServer) History(ctx context.Context, r *object.ObjectHistoryRequest) (*object.ObjectHistoryResponse, error) { func (s *sqlObjectServer) History(ctx context.Context, r *object.ObjectHistoryRequest) (*object.ObjectHistoryResponse, error) {
route, err := s.getObjectKey(ctx, r.GRN) route, err := s.getObjectKey(ctx, r.GRN)
if err != nil { if err != nil {

View File

@ -0,0 +1,11 @@
package object
// The admin request is a superset of write request features
func ToAdminWriteObjectRequest(req *WriteObjectRequest) *AdminWriteObjectRequest {
return &AdminWriteObjectRequest{
GRN: req.GRN,
Body: req.Body,
Comment: req.Comment,
PreviousVersion: req.PreviousVersion,
}
}