mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
merged main and moved folder structure
This commit is contained in:
2
Makefile
2
Makefile
@@ -379,7 +379,7 @@ protobuf: ## Compile protobuf definitions
|
||||
# buf generate pkg/plugins/backendplugin/pluginextensionv2 --template pkg/plugins/backendplugin/pluginextensionv2/buf.gen.yaml
|
||||
# buf generate pkg/plugins/backendplugin/secretsmanagerplugin --template pkg/plugins/backendplugin/secretsmanagerplugin/buf.gen.yaml
|
||||
# buf generate pkg/services/store/entity --template pkg/services/store/entity/buf.gen.yaml
|
||||
buf generate pkg/storage/api --template pkg/storage/api/buf.gen.yaml
|
||||
buf generate pkg/storage/unified --template pkg/storage/unified/buf.gen.yaml
|
||||
|
||||
.PHONY: clean
|
||||
clean: ## Clean up intermediate build artifacts.
|
||||
|
||||
2
go.work
2
go.work
@@ -6,7 +6,7 @@ use (
|
||||
./pkg/apiserver
|
||||
./pkg/build/wire
|
||||
./pkg/promlib
|
||||
./pkg/storage/api
|
||||
./pkg/storage/unified
|
||||
./pkg/util/xorm
|
||||
)
|
||||
|
||||
|
||||
@@ -1,10 +0,0 @@
|
||||
version: v1
|
||||
plugins:
|
||||
- plugin: go
|
||||
out: pkg/services/store/resource
|
||||
opt: paths=source_relative
|
||||
- plugin: go-grpc
|
||||
out: pkg/services/store/resource
|
||||
opt:
|
||||
- paths=source_relative
|
||||
- require_unimplemented_servers=false
|
||||
@@ -1,219 +0,0 @@
|
||||
package resource
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync/atomic"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
"github.com/grafana/grafana/pkg/infra/appcontext"
|
||||
"github.com/grafana/grafana/pkg/services/auth/identity"
|
||||
)
|
||||
|
||||
type WriteEvent struct {
|
||||
EventID int64
|
||||
Key *Key // the request key
|
||||
Requester identity.Requester
|
||||
Operation ResourceOperation
|
||||
PreviousRV int64 // only for Update+Delete
|
||||
Value []byte
|
||||
|
||||
Object utils.GrafanaMetaAccessor
|
||||
OldObject utils.GrafanaMetaAccessor
|
||||
|
||||
// Change metadata
|
||||
FolderChanged bool
|
||||
|
||||
// The status will be populated for any error
|
||||
Status *StatusResult
|
||||
Error error
|
||||
}
|
||||
|
||||
func (e *WriteEvent) BadRequest(err error, message string, a ...any) *WriteEvent {
|
||||
e.Error = err
|
||||
e.Status = &StatusResult{
|
||||
Status: "Failure",
|
||||
Message: fmt.Sprintf(message, a...),
|
||||
Code: http.StatusBadRequest,
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
// Verify that all required fields are set, and the user has permission to set the common metadata fields
|
||||
type EventValidator interface {
|
||||
PrepareCreate(ctx context.Context, req *CreateRequest) (*WriteEvent, error)
|
||||
PrepareUpdate(ctx context.Context, req *UpdateRequest, current []byte) (*WriteEvent, error)
|
||||
}
|
||||
|
||||
type EventValidatorOptions struct {
|
||||
// Get the next EventID
|
||||
NextEventID func() int64
|
||||
|
||||
// Check if a user has access to write folders
|
||||
// When this is nil, no resources can have folders configured
|
||||
FolderAccess func(ctx context.Context, user identity.Requester, uid string) bool
|
||||
|
||||
// When configured, this will make sure a user is allowed to save to a given origin
|
||||
OriginAccess func(ctx context.Context, user identity.Requester, origin string) bool
|
||||
}
|
||||
|
||||
type eventValidator struct {
|
||||
opts EventValidatorOptions
|
||||
}
|
||||
|
||||
func NewEventValidator(opts EventValidatorOptions) EventValidator {
|
||||
if opts.NextEventID == nil {
|
||||
counter := atomic.Int64{}
|
||||
opts.NextEventID = func() int64 {
|
||||
return counter.Add(1)
|
||||
}
|
||||
}
|
||||
return &eventValidator{opts}
|
||||
}
|
||||
|
||||
type dummyObject struct {
|
||||
metav1.TypeMeta `json:",inline"`
|
||||
metav1.ObjectMeta `json:"metadata,omitempty"`
|
||||
}
|
||||
|
||||
var _ EventValidator = &eventValidator{}
|
||||
|
||||
func (v *eventValidator) newEvent(ctx context.Context, key *Key, value, oldValue []byte) *WriteEvent {
|
||||
var err error
|
||||
event := &WriteEvent{
|
||||
EventID: v.opts.NextEventID(),
|
||||
Key: key,
|
||||
Value: value,
|
||||
}
|
||||
event.Requester, err = appcontext.User(ctx)
|
||||
if err != nil {
|
||||
return event.BadRequest(err, "unable to get user")
|
||||
}
|
||||
|
||||
dummy := &dummyObject{}
|
||||
err = json.Unmarshal(value, dummy)
|
||||
if err != nil {
|
||||
return event.BadRequest(err, "error reading json")
|
||||
}
|
||||
|
||||
obj, err := utils.MetaAccessor(dummy)
|
||||
if err != nil {
|
||||
return event.BadRequest(err, "invalid object in json")
|
||||
}
|
||||
if obj.GetUID() == "" {
|
||||
return event.BadRequest(nil, "the UID must be set")
|
||||
}
|
||||
if obj.GetGenerateName() != "" {
|
||||
return event.BadRequest(nil, "can not save value with generate name")
|
||||
}
|
||||
if obj.GetKind() == "" {
|
||||
return event.BadRequest(nil, "expecting resources with a kind in the body")
|
||||
}
|
||||
if obj.GetName() != key.Name {
|
||||
return event.BadRequest(nil, "key name does not match the name in the body")
|
||||
}
|
||||
if obj.GetNamespace() != key.Namespace {
|
||||
return event.BadRequest(nil, "key namespace does not match the namespace in the body")
|
||||
}
|
||||
folder := obj.GetFolder()
|
||||
if folder != "" {
|
||||
if v.opts.FolderAccess == nil {
|
||||
return event.BadRequest(err, "folders are not supported")
|
||||
} else if !v.opts.FolderAccess(ctx, event.Requester, folder) {
|
||||
return event.BadRequest(err, "unable to add resource to folder") // 403?
|
||||
}
|
||||
}
|
||||
origin, err := obj.GetOriginInfo()
|
||||
if err != nil {
|
||||
return event.BadRequest(err, "invalid origin info")
|
||||
}
|
||||
if origin != nil && v.opts.OriginAccess != nil {
|
||||
if !v.opts.OriginAccess(ctx, event.Requester, origin.Name) {
|
||||
return event.BadRequest(err, "not allowed to write resource to origin (%s)", origin.Name)
|
||||
}
|
||||
}
|
||||
event.Object = obj
|
||||
|
||||
// This is an update
|
||||
if oldValue != nil {
|
||||
dummy := &dummyObject{}
|
||||
err = json.Unmarshal(oldValue, dummy)
|
||||
if err != nil {
|
||||
return event.BadRequest(err, "error reading old json value")
|
||||
}
|
||||
old, err := utils.MetaAccessor(dummy)
|
||||
if err != nil {
|
||||
return event.BadRequest(err, "invalid object inside old json")
|
||||
}
|
||||
if key.Name != old.GetName() {
|
||||
return event.BadRequest(err, "the old value has a different name (%s != %s)", key.Name, old.GetName())
|
||||
}
|
||||
|
||||
// Can not change creation timestamps+user
|
||||
if obj.GetCreatedBy() != old.GetCreatedBy() {
|
||||
return event.BadRequest(err, "can not change the created by metadata (%s != %s)", obj.GetCreatedBy(), old.GetCreatedBy())
|
||||
}
|
||||
if obj.GetCreationTimestamp() != old.GetCreationTimestamp() {
|
||||
return event.BadRequest(err, "can not change the CreationTimestamp metadata (%v != %v)", obj.GetCreationTimestamp(), old.GetCreationTimestamp())
|
||||
}
|
||||
|
||||
oldFolder := obj.GetFolder()
|
||||
if oldFolder != folder {
|
||||
event.FolderChanged = true
|
||||
}
|
||||
event.OldObject = old
|
||||
} else if folder != "" {
|
||||
event.FolderChanged = true
|
||||
}
|
||||
return event
|
||||
}
|
||||
|
||||
func (v *eventValidator) PrepareCreate(ctx context.Context, req *CreateRequest) (*WriteEvent, error) {
|
||||
event := v.newEvent(ctx, req.Key, req.Value, nil)
|
||||
event.Operation = ResourceOperation_CREATED
|
||||
if event.Status != nil {
|
||||
return event, nil
|
||||
}
|
||||
|
||||
// Make sure the created by user is accurate
|
||||
//----------------------------------------
|
||||
val := event.Object.GetCreatedBy()
|
||||
if val != "" && val != event.Requester.GetUID().String() {
|
||||
return event.BadRequest(nil, "created by annotation does not match: metadata.annotations#"+utils.AnnoKeyCreatedBy), nil
|
||||
}
|
||||
|
||||
// Create can not have updated properties
|
||||
//----------------------------------------
|
||||
if event.Object.GetUpdatedBy() != "" {
|
||||
return event.BadRequest(nil, "unexpected metadata.annotations#"+utils.AnnoKeyCreatedBy), nil
|
||||
}
|
||||
ts, err := event.Object.GetUpdatedTimestamp()
|
||||
if err != nil {
|
||||
return event.BadRequest(nil, fmt.Sprintf("invalid timestamp: %s", err)), nil
|
||||
}
|
||||
if ts != nil {
|
||||
return event.BadRequest(nil, "unexpected metadata.annotations#"+utils.AnnoKeyUpdatedTimestamp), nil
|
||||
}
|
||||
return event, nil
|
||||
}
|
||||
|
||||
func (v *eventValidator) PrepareUpdate(ctx context.Context, req *UpdateRequest, current []byte) (*WriteEvent, error) {
|
||||
event := v.newEvent(ctx, req.Key, req.Value, current)
|
||||
event.Operation = ResourceOperation_UPDATED
|
||||
if event.Status != nil {
|
||||
return event, nil
|
||||
}
|
||||
|
||||
// Make sure the update user is accurate
|
||||
//----------------------------------------
|
||||
val := event.Object.GetUpdatedBy()
|
||||
if val != "" && val != event.Requester.GetUID().String() {
|
||||
return event.BadRequest(nil, "created by annotation does not match: metadata.annotations#"+utils.AnnoKeyUpdatedBy), nil
|
||||
}
|
||||
|
||||
return event, nil
|
||||
}
|
||||
@@ -1,50 +0,0 @@
|
||||
package resource
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// NamespacedPath is a path that can be used to isolate tenant data
|
||||
// NOTE: this strategy does not allow quickly searching across namespace boundaries with a prefix
|
||||
func (x *Key) NamespacedPath() string {
|
||||
var buffer bytes.Buffer
|
||||
if x.Namespace == "" {
|
||||
buffer.WriteString("__cluster__")
|
||||
} else {
|
||||
buffer.WriteString(x.Namespace)
|
||||
}
|
||||
if x.Group == "" {
|
||||
return buffer.String()
|
||||
}
|
||||
buffer.WriteString("/")
|
||||
buffer.WriteString(x.Group)
|
||||
|
||||
if x.Resource == "" {
|
||||
return buffer.String()
|
||||
}
|
||||
buffer.WriteString("/")
|
||||
buffer.WriteString(x.Resource)
|
||||
|
||||
if x.Name == "" {
|
||||
return buffer.String()
|
||||
}
|
||||
buffer.WriteString("/")
|
||||
buffer.WriteString(x.Name)
|
||||
|
||||
if x.ResourceVersion > 0 {
|
||||
buffer.WriteString("/")
|
||||
buffer.WriteString(fmt.Sprintf("%.20d", x.ResourceVersion))
|
||||
}
|
||||
return buffer.String()
|
||||
}
|
||||
|
||||
// Return a copy without the resource version
|
||||
func (x *Key) WithoutResourceVersion() *Key {
|
||||
return &Key{
|
||||
Namespace: x.Namespace,
|
||||
Group: x.Group,
|
||||
Resource: x.Resource,
|
||||
Name: x.Name,
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,416 +0,0 @@
|
||||
syntax = "proto3";
|
||||
package resource;
|
||||
|
||||
option go_package = "github.com/grafana/grafana/pkg/services/store/resource";
|
||||
|
||||
|
||||
message Key {
|
||||
// Namespace (tenant)
|
||||
string namespace = 2;
|
||||
// Resource Group
|
||||
string group = 1;
|
||||
// The resource type
|
||||
string resource = 3;
|
||||
// Resource identifier (unique within namespace+group+resource)
|
||||
string name = 4;
|
||||
// The resource version
|
||||
int64 resource_version = 5;
|
||||
}
|
||||
|
||||
enum ResourceOperation {
|
||||
UNKNOWN = 0;
|
||||
CREATED = 1;
|
||||
UPDATED = 2;
|
||||
DELETED = 3;
|
||||
BOOKMARK = 4;
|
||||
}
|
||||
|
||||
message ResourceWrapper {
|
||||
// The resource version
|
||||
int64 resource_version = 1;
|
||||
|
||||
// Full kubernetes json bytes (although the resource version may not be accurate)
|
||||
bytes value = 2;
|
||||
|
||||
// Operation
|
||||
ResourceOperation operation = 3;
|
||||
|
||||
// The resource has an attached blob
|
||||
bool has_blob = 4;
|
||||
}
|
||||
|
||||
// The history and trash commands need access to commit messages
|
||||
message ResourceMeta {
|
||||
// The resource version
|
||||
int64 resource_version = 1;
|
||||
|
||||
// The optional commit message
|
||||
ResourceOperation operation = 2;
|
||||
|
||||
// Size of the full resource body
|
||||
int32 size = 3;
|
||||
|
||||
// Hash for the resource
|
||||
string hash = 4;
|
||||
|
||||
// The optional commit message
|
||||
string message = 5;
|
||||
|
||||
// The kubernetes metadata section (not the full resource)
|
||||
// https://github.com/kubernetes/kubernetes/blob/v1.30.1/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go#L111
|
||||
bytes object_meta = 6;
|
||||
|
||||
// The resource has an attached blob
|
||||
bool has_blob = 7;
|
||||
}
|
||||
|
||||
// Basic blob metadata
|
||||
message BlobInfo {
|
||||
// Content Length
|
||||
int64 size = 1;
|
||||
|
||||
// MD5 digest of the body
|
||||
string ETag = 2;
|
||||
|
||||
// Content type header
|
||||
string content_type = 3;
|
||||
}
|
||||
|
||||
// Status structure is copied from:
|
||||
// https://github.com/kubernetes/apimachinery/blob/v0.30.1/pkg/apis/meta/v1/generated.proto#L979
|
||||
message StatusResult {
|
||||
// Status of the operation.
|
||||
// One of: "Success" or "Failure".
|
||||
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
|
||||
// +optional
|
||||
string status = 1;
|
||||
// A human-readable description of the status of this operation.
|
||||
// +optional
|
||||
string message = 2;
|
||||
// A machine-readable description of why this operation is in the
|
||||
// "Failure" status. If this value is empty there
|
||||
// is no information available. A Reason clarifies an HTTP status
|
||||
// code but does not override it.
|
||||
// +optional
|
||||
string reason = 3;
|
||||
// Suggested HTTP return code for this status, 0 if not set.
|
||||
// +optional
|
||||
int32 code = 4;
|
||||
}
|
||||
|
||||
// TODO? support PresignedUrls for upload?
|
||||
message CreateBlob {
|
||||
// Content type header
|
||||
string content_type = 1;
|
||||
|
||||
// Raw value to write
|
||||
bytes value = 2;
|
||||
}
|
||||
|
||||
// ----------------------------------
|
||||
// CRUD Objects
|
||||
// ----------------------------------
|
||||
|
||||
message CreateRequest {
|
||||
// Requires group+resource to be configuired
|
||||
// If name is not set, a unique name will be generated
|
||||
// The resourceVersion should not be set
|
||||
Key key = 1;
|
||||
|
||||
// The resource JSON.
|
||||
bytes value = 2;
|
||||
|
||||
// Optional commit message
|
||||
string message = 3;
|
||||
|
||||
// Optionally include a large binary object
|
||||
CreateBlob blob = 4;
|
||||
}
|
||||
|
||||
message CreateResponse {
|
||||
// Status code
|
||||
StatusResult status = 1;
|
||||
|
||||
// The updated resource version
|
||||
int64 resource_version = 2;
|
||||
}
|
||||
|
||||
message UpdateRequest {
|
||||
// Full key must be set
|
||||
Key key = 1;
|
||||
|
||||
// The resource JSON.
|
||||
bytes value = 2;
|
||||
|
||||
// Optional commit message
|
||||
// +optional
|
||||
string message = 3;
|
||||
|
||||
// Optionally link a resource object
|
||||
CreateBlob blob = 4;
|
||||
}
|
||||
|
||||
message UpdateResponse {
|
||||
// Status code
|
||||
StatusResult status = 1;
|
||||
|
||||
// The updated resource version
|
||||
int64 resource_version = 2;
|
||||
}
|
||||
|
||||
message DeleteRequest {
|
||||
Key key = 1;
|
||||
|
||||
// Preconditions: make sure the uid matches the current saved value
|
||||
// +optional
|
||||
string uid = 2;
|
||||
}
|
||||
|
||||
message DeleteResponse {
|
||||
// Status code
|
||||
StatusResult status = 1;
|
||||
|
||||
// The new resource version
|
||||
int64 resource_version = 2;
|
||||
}
|
||||
|
||||
message GetResourceRequest {
|
||||
Key key = 1;
|
||||
}
|
||||
|
||||
message GetResourceResponse {
|
||||
// Status code
|
||||
StatusResult status = 1;
|
||||
|
||||
// The new resource version
|
||||
int64 resource_version = 2;
|
||||
|
||||
// The properties
|
||||
bytes value = 3;
|
||||
|
||||
// A Signed URL that will let you fetch the blob
|
||||
// If this value starts with # you must read the bytes using the GetResourceBlob request
|
||||
string blob_url = 4;
|
||||
}
|
||||
|
||||
message GetBlobRequest {
|
||||
Key key = 1;
|
||||
}
|
||||
|
||||
message GetBlobResponse {
|
||||
// Status code
|
||||
StatusResult status = 1;
|
||||
|
||||
// Headers
|
||||
BlobInfo info = 2;
|
||||
|
||||
// The raw object value
|
||||
bytes value = 3;
|
||||
}
|
||||
|
||||
// ----------------------------------
|
||||
// List Request/Response
|
||||
// ----------------------------------
|
||||
|
||||
// The label filtering requirements:
|
||||
// https://github.com/kubernetes/kubernetes/blob/v1.30.1/staging/src/k8s.io/apimachinery/pkg/labels/selector.go#L141
|
||||
message Requirement {
|
||||
string key = 1;
|
||||
string operator = 2; // See https://github.com/kubernetes/kubernetes/blob/v1.30.1/staging/src/k8s.io/apimachinery/pkg/selection/operator.go#L21
|
||||
repeated string values = 3; // typically one value, but depends on the operator
|
||||
}
|
||||
|
||||
message Sort {
|
||||
enum Order {
|
||||
ASC = 0;
|
||||
DESC = 1;
|
||||
}
|
||||
string field = 1;
|
||||
Order order = 2;
|
||||
}
|
||||
|
||||
message ListOptions {
|
||||
// Maximum number of items to return
|
||||
// NOTE responses will also be limited by the response payload size
|
||||
int64 limit = 2;
|
||||
|
||||
// Namespace+Group+Resource+etc
|
||||
Key key = 3;
|
||||
|
||||
// Match label
|
||||
repeated Requirement labels = 4;
|
||||
|
||||
// Match fields (not yet supported)
|
||||
repeated Requirement fields = 5;
|
||||
|
||||
// Limit results to items in a specific folder (not a query for everything under that folder!)
|
||||
string folder = 6;
|
||||
}
|
||||
|
||||
message ListRequest {
|
||||
// Starting from the requested page (other query parameters must match!)
|
||||
string next_page_token = 1;
|
||||
|
||||
// Filtering
|
||||
ListOptions options = 2;
|
||||
|
||||
// Sorting instructions `field ASC/DESC`
|
||||
repeated Sort sort = 3;
|
||||
}
|
||||
|
||||
message ListResponse {
|
||||
repeated ResourceWrapper items = 1;
|
||||
|
||||
// When more results exist, pass this in the next request
|
||||
string next_page_token = 2;
|
||||
|
||||
// ResourceVersion of the list response
|
||||
int64 resource_version = 3;
|
||||
|
||||
// remainingItemCount is the number of subsequent items in the list which are not included in this
|
||||
// list response. If the list request contained label or field selectors, then the number of
|
||||
// remaining items is unknown and the field will be left unset and omitted during serialization.
|
||||
// If the list is complete (either because it is not chunking or because this is the last chunk),
|
||||
// then there are no more remaining items and this field will be left unset and omitted during
|
||||
// serialization.
|
||||
//
|
||||
// The intended use of the remainingItemCount is *estimating* the size of a collection. Clients
|
||||
// should not rely on the remainingItemCount to be set or to be exact.
|
||||
// +optional
|
||||
int64 remaining_item_count = 4; // 0 won't be set either (no next page token)
|
||||
}
|
||||
|
||||
message WatchRequest {
|
||||
// ResourceVersion of last changes. Empty will default to full history
|
||||
int64 since = 1;
|
||||
|
||||
// Watch specific entities
|
||||
Key key = 2;
|
||||
|
||||
// Additional options
|
||||
ListOptions options = 3;
|
||||
|
||||
// Return initial events
|
||||
bool send_initial_events = 4;
|
||||
|
||||
// When done with initial events, send a bookmark event
|
||||
bool allow_watch_bookmarks = 5;
|
||||
}
|
||||
|
||||
message WatchResponse {
|
||||
// Timestamp the event was sent
|
||||
int64 timestamp = 1;
|
||||
|
||||
// Entity that was created, updated, or deleted
|
||||
ResourceWrapper resource = 2;
|
||||
|
||||
// previous version of the entity (in update+delete events)
|
||||
ResourceWrapper previous = 3;
|
||||
}
|
||||
|
||||
message HistoryRequest {
|
||||
// Starting from the requested page (other query parameters must match!)
|
||||
string next_page_token = 1;
|
||||
|
||||
// Maximum number of items to return
|
||||
int64 limit = 2;
|
||||
|
||||
// Resource identifier
|
||||
Key key = 3;
|
||||
|
||||
// List the deleted values (eg, show trash)
|
||||
bool show_deleted = 4;
|
||||
}
|
||||
|
||||
message HistoryResponse {
|
||||
repeated ResourceMeta items = 1;
|
||||
|
||||
// More results exist... pass this in the next request
|
||||
string next_page_token = 2;
|
||||
|
||||
// ResourceVersion of the list response
|
||||
int64 resource_version = 3;
|
||||
}
|
||||
|
||||
message OriginRequest {
|
||||
// Starting from the requested page (other query parameters must match!)
|
||||
string next_page_token = 1;
|
||||
|
||||
// Maximum number of items to return
|
||||
int64 limit = 2;
|
||||
|
||||
// Resource identifier
|
||||
Key key = 3;
|
||||
|
||||
// List the deleted values (eg, show trash)
|
||||
string origin = 4;
|
||||
}
|
||||
|
||||
message ResourceOriginInfo {
|
||||
// The resource
|
||||
Key key = 1;
|
||||
|
||||
// Size of the full resource body
|
||||
int32 resource_size = 2;
|
||||
|
||||
// Hash for the resource
|
||||
string resource_hash = 3;
|
||||
|
||||
// The origin name
|
||||
string origin = 4;
|
||||
|
||||
// Path on the origin
|
||||
string path = 5;
|
||||
|
||||
// Verification hash from the origin
|
||||
string hash = 6;
|
||||
|
||||
// Change time from the origin
|
||||
int64 timestamp = 7;
|
||||
}
|
||||
|
||||
message OriginResponse {
|
||||
repeated ResourceOriginInfo items = 1;
|
||||
|
||||
// More results exist... pass this in the next request
|
||||
string next_page_token = 2;
|
||||
|
||||
// ResourceVersion of the list response
|
||||
int64 resource_version = 3;
|
||||
}
|
||||
|
||||
message HealthCheckRequest {
|
||||
string service = 1;
|
||||
}
|
||||
|
||||
message HealthCheckResponse {
|
||||
enum ServingStatus {
|
||||
UNKNOWN = 0;
|
||||
SERVING = 1;
|
||||
NOT_SERVING = 2;
|
||||
SERVICE_UNKNOWN = 3; // Used only by the Watch method.
|
||||
}
|
||||
ServingStatus status = 1;
|
||||
}
|
||||
|
||||
// The entity store provides a basic CRUD (+watch eventually) interface for generic entities
|
||||
service ResourceStore {
|
||||
rpc GetResource(GetResourceRequest) returns (GetResourceResponse);
|
||||
rpc Create(CreateRequest) returns (CreateResponse);
|
||||
rpc Update(UpdateRequest) returns (UpdateResponse);
|
||||
rpc Delete(DeleteRequest) returns (DeleteResponse);
|
||||
rpc List(ListRequest) returns (ListResponse);
|
||||
rpc Watch(WatchRequest) returns (stream WatchResponse);
|
||||
|
||||
// Get the raw blob bytes and metadata
|
||||
rpc GetBlob(GetBlobRequest) returns (GetBlobResponse);
|
||||
|
||||
// Show resource history (and trash)
|
||||
rpc History(HistoryRequest) returns (HistoryResponse);
|
||||
|
||||
// Used for efficient provisioning
|
||||
rpc Origin(OriginRequest) returns (OriginResponse);
|
||||
|
||||
// Check if the service is healthy
|
||||
rpc IsHealthy(HealthCheckRequest) returns (HealthCheckResponse);
|
||||
}
|
||||
@@ -1,490 +0,0 @@
|
||||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||
// versions:
|
||||
// - protoc-gen-go-grpc v1.4.0
|
||||
// - protoc (unknown)
|
||||
// source: resource.proto
|
||||
|
||||
package resource
|
||||
|
||||
import (
|
||||
context "context"
|
||||
grpc "google.golang.org/grpc"
|
||||
codes "google.golang.org/grpc/codes"
|
||||
status "google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the grpc package it is being compiled against.
|
||||
// Requires gRPC-Go v1.62.0 or later.
|
||||
const _ = grpc.SupportPackageIsVersion8
|
||||
|
||||
const (
|
||||
ResourceStore_GetResource_FullMethodName = "/resource.ResourceStore/GetResource"
|
||||
ResourceStore_Create_FullMethodName = "/resource.ResourceStore/Create"
|
||||
ResourceStore_Update_FullMethodName = "/resource.ResourceStore/Update"
|
||||
ResourceStore_Delete_FullMethodName = "/resource.ResourceStore/Delete"
|
||||
ResourceStore_List_FullMethodName = "/resource.ResourceStore/List"
|
||||
ResourceStore_Watch_FullMethodName = "/resource.ResourceStore/Watch"
|
||||
ResourceStore_GetBlob_FullMethodName = "/resource.ResourceStore/GetBlob"
|
||||
ResourceStore_History_FullMethodName = "/resource.ResourceStore/History"
|
||||
ResourceStore_Origin_FullMethodName = "/resource.ResourceStore/Origin"
|
||||
ResourceStore_IsHealthy_FullMethodName = "/resource.ResourceStore/IsHealthy"
|
||||
)
|
||||
|
||||
// ResourceStoreClient is the client API for ResourceStore service.
|
||||
//
|
||||
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
||||
//
|
||||
// The entity store provides a basic CRUD (+watch eventually) interface for generic entities
|
||||
type ResourceStoreClient interface {
|
||||
GetResource(ctx context.Context, in *GetResourceRequest, opts ...grpc.CallOption) (*GetResourceResponse, error)
|
||||
Create(ctx context.Context, in *CreateRequest, opts ...grpc.CallOption) (*CreateResponse, error)
|
||||
Update(ctx context.Context, in *UpdateRequest, opts ...grpc.CallOption) (*UpdateResponse, error)
|
||||
Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error)
|
||||
List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error)
|
||||
Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (ResourceStore_WatchClient, error)
|
||||
// Get the raw blob bytes and metadata
|
||||
GetBlob(ctx context.Context, in *GetBlobRequest, opts ...grpc.CallOption) (*GetBlobResponse, error)
|
||||
// Show resource history (and trash)
|
||||
History(ctx context.Context, in *HistoryRequest, opts ...grpc.CallOption) (*HistoryResponse, error)
|
||||
// Used for efficient provisioning
|
||||
Origin(ctx context.Context, in *OriginRequest, opts ...grpc.CallOption) (*OriginResponse, error)
|
||||
// Check if the service is healthy
|
||||
IsHealthy(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error)
|
||||
}
|
||||
|
||||
type resourceStoreClient struct {
|
||||
cc grpc.ClientConnInterface
|
||||
}
|
||||
|
||||
func NewResourceStoreClient(cc grpc.ClientConnInterface) ResourceStoreClient {
|
||||
return &resourceStoreClient{cc}
|
||||
}
|
||||
|
||||
func (c *resourceStoreClient) GetResource(ctx context.Context, in *GetResourceRequest, opts ...grpc.CallOption) (*GetResourceResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(GetResourceResponse)
|
||||
err := c.cc.Invoke(ctx, ResourceStore_GetResource_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *resourceStoreClient) Create(ctx context.Context, in *CreateRequest, opts ...grpc.CallOption) (*CreateResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(CreateResponse)
|
||||
err := c.cc.Invoke(ctx, ResourceStore_Create_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *resourceStoreClient) Update(ctx context.Context, in *UpdateRequest, opts ...grpc.CallOption) (*UpdateResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(UpdateResponse)
|
||||
err := c.cc.Invoke(ctx, ResourceStore_Update_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *resourceStoreClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(DeleteResponse)
|
||||
err := c.cc.Invoke(ctx, ResourceStore_Delete_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *resourceStoreClient) List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(ListResponse)
|
||||
err := c.cc.Invoke(ctx, ResourceStore_List_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *resourceStoreClient) Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (ResourceStore_WatchClient, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
stream, err := c.cc.NewStream(ctx, &ResourceStore_ServiceDesc.Streams[0], ResourceStore_Watch_FullMethodName, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x := &resourceStoreWatchClient{ClientStream: stream}
|
||||
if err := x.ClientStream.SendMsg(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := x.ClientStream.CloseSend(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
type ResourceStore_WatchClient interface {
|
||||
Recv() (*WatchResponse, error)
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
type resourceStoreWatchClient struct {
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
func (x *resourceStoreWatchClient) Recv() (*WatchResponse, error) {
|
||||
m := new(WatchResponse)
|
||||
if err := x.ClientStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (c *resourceStoreClient) GetBlob(ctx context.Context, in *GetBlobRequest, opts ...grpc.CallOption) (*GetBlobResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(GetBlobResponse)
|
||||
err := c.cc.Invoke(ctx, ResourceStore_GetBlob_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *resourceStoreClient) History(ctx context.Context, in *HistoryRequest, opts ...grpc.CallOption) (*HistoryResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(HistoryResponse)
|
||||
err := c.cc.Invoke(ctx, ResourceStore_History_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *resourceStoreClient) Origin(ctx context.Context, in *OriginRequest, opts ...grpc.CallOption) (*OriginResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(OriginResponse)
|
||||
err := c.cc.Invoke(ctx, ResourceStore_Origin_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *resourceStoreClient) IsHealthy(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(HealthCheckResponse)
|
||||
err := c.cc.Invoke(ctx, ResourceStore_IsHealthy_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// ResourceStoreServer is the server API for ResourceStore service.
|
||||
// All implementations should embed UnimplementedResourceStoreServer
|
||||
// for forward compatibility
|
||||
//
|
||||
// The entity store provides a basic CRUD (+watch eventually) interface for generic entities
|
||||
type ResourceStoreServer interface {
|
||||
GetResource(context.Context, *GetResourceRequest) (*GetResourceResponse, error)
|
||||
Create(context.Context, *CreateRequest) (*CreateResponse, error)
|
||||
Update(context.Context, *UpdateRequest) (*UpdateResponse, error)
|
||||
Delete(context.Context, *DeleteRequest) (*DeleteResponse, error)
|
||||
List(context.Context, *ListRequest) (*ListResponse, error)
|
||||
Watch(*WatchRequest, ResourceStore_WatchServer) error
|
||||
// Get the raw blob bytes and metadata
|
||||
GetBlob(context.Context, *GetBlobRequest) (*GetBlobResponse, error)
|
||||
// Show resource history (and trash)
|
||||
History(context.Context, *HistoryRequest) (*HistoryResponse, error)
|
||||
// Used for efficient provisioning
|
||||
Origin(context.Context, *OriginRequest) (*OriginResponse, error)
|
||||
// Check if the service is healthy
|
||||
IsHealthy(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error)
|
||||
}
|
||||
|
||||
// UnimplementedResourceStoreServer should be embedded to have forward compatible implementations.
|
||||
type UnimplementedResourceStoreServer struct {
|
||||
}
|
||||
|
||||
func (UnimplementedResourceStoreServer) GetResource(context.Context, *GetResourceRequest) (*GetResourceResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method GetResource not implemented")
|
||||
}
|
||||
func (UnimplementedResourceStoreServer) Create(context.Context, *CreateRequest) (*CreateResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method Create not implemented")
|
||||
}
|
||||
func (UnimplementedResourceStoreServer) Update(context.Context, *UpdateRequest) (*UpdateResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method Update not implemented")
|
||||
}
|
||||
func (UnimplementedResourceStoreServer) Delete(context.Context, *DeleteRequest) (*DeleteResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method Delete not implemented")
|
||||
}
|
||||
func (UnimplementedResourceStoreServer) List(context.Context, *ListRequest) (*ListResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method List not implemented")
|
||||
}
|
||||
func (UnimplementedResourceStoreServer) Watch(*WatchRequest, ResourceStore_WatchServer) error {
|
||||
return status.Errorf(codes.Unimplemented, "method Watch not implemented")
|
||||
}
|
||||
func (UnimplementedResourceStoreServer) GetBlob(context.Context, *GetBlobRequest) (*GetBlobResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method GetBlob not implemented")
|
||||
}
|
||||
func (UnimplementedResourceStoreServer) History(context.Context, *HistoryRequest) (*HistoryResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method History not implemented")
|
||||
}
|
||||
func (UnimplementedResourceStoreServer) Origin(context.Context, *OriginRequest) (*OriginResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method Origin not implemented")
|
||||
}
|
||||
func (UnimplementedResourceStoreServer) IsHealthy(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method IsHealthy not implemented")
|
||||
}
|
||||
|
||||
// UnsafeResourceStoreServer may be embedded to opt out of forward compatibility for this service.
|
||||
// Use of this interface is not recommended, as added methods to ResourceStoreServer will
|
||||
// result in compilation errors.
|
||||
type UnsafeResourceStoreServer interface {
|
||||
mustEmbedUnimplementedResourceStoreServer()
|
||||
}
|
||||
|
||||
func RegisterResourceStoreServer(s grpc.ServiceRegistrar, srv ResourceStoreServer) {
|
||||
s.RegisterService(&ResourceStore_ServiceDesc, srv)
|
||||
}
|
||||
|
||||
func _ResourceStore_GetResource_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(GetResourceRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ResourceStoreServer).GetResource(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: ResourceStore_GetResource_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ResourceStoreServer).GetResource(ctx, req.(*GetResourceRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ResourceStore_Create_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(CreateRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ResourceStoreServer).Create(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: ResourceStore_Create_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ResourceStoreServer).Create(ctx, req.(*CreateRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ResourceStore_Update_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(UpdateRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ResourceStoreServer).Update(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: ResourceStore_Update_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ResourceStoreServer).Update(ctx, req.(*UpdateRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ResourceStore_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(DeleteRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ResourceStoreServer).Delete(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: ResourceStore_Delete_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ResourceStoreServer).Delete(ctx, req.(*DeleteRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ResourceStore_List_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(ListRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ResourceStoreServer).List(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: ResourceStore_List_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ResourceStoreServer).List(ctx, req.(*ListRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ResourceStore_Watch_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
m := new(WatchRequest)
|
||||
if err := stream.RecvMsg(m); err != nil {
|
||||
return err
|
||||
}
|
||||
return srv.(ResourceStoreServer).Watch(m, &resourceStoreWatchServer{ServerStream: stream})
|
||||
}
|
||||
|
||||
type ResourceStore_WatchServer interface {
|
||||
Send(*WatchResponse) error
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
type resourceStoreWatchServer struct {
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
func (x *resourceStoreWatchServer) Send(m *WatchResponse) error {
|
||||
return x.ServerStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func _ResourceStore_GetBlob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(GetBlobRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ResourceStoreServer).GetBlob(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: ResourceStore_GetBlob_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ResourceStoreServer).GetBlob(ctx, req.(*GetBlobRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ResourceStore_History_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(HistoryRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ResourceStoreServer).History(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: ResourceStore_History_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ResourceStoreServer).History(ctx, req.(*HistoryRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ResourceStore_Origin_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(OriginRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ResourceStoreServer).Origin(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: ResourceStore_Origin_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ResourceStoreServer).Origin(ctx, req.(*OriginRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ResourceStore_IsHealthy_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(HealthCheckRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ResourceStoreServer).IsHealthy(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: ResourceStore_IsHealthy_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ResourceStoreServer).IsHealthy(ctx, req.(*HealthCheckRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
// ResourceStore_ServiceDesc is the grpc.ServiceDesc for ResourceStore service.
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
// and not to be introspected or modified (even as a copy)
|
||||
var ResourceStore_ServiceDesc = grpc.ServiceDesc{
|
||||
ServiceName: "resource.ResourceStore",
|
||||
HandlerType: (*ResourceStoreServer)(nil),
|
||||
Methods: []grpc.MethodDesc{
|
||||
{
|
||||
MethodName: "GetResource",
|
||||
Handler: _ResourceStore_GetResource_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "Create",
|
||||
Handler: _ResourceStore_Create_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "Update",
|
||||
Handler: _ResourceStore_Update_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "Delete",
|
||||
Handler: _ResourceStore_Delete_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "List",
|
||||
Handler: _ResourceStore_List_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "GetBlob",
|
||||
Handler: _ResourceStore_GetBlob_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "History",
|
||||
Handler: _ResourceStore_History_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "Origin",
|
||||
Handler: _ResourceStore_Origin_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "IsHealthy",
|
||||
Handler: _ResourceStore_IsHealthy_Handler,
|
||||
},
|
||||
},
|
||||
Streams: []grpc.StreamDesc{
|
||||
{
|
||||
StreamName: "Watch",
|
||||
Handler: _ResourceStore_Watch_Handler,
|
||||
ServerStreams: true,
|
||||
},
|
||||
},
|
||||
Metadata: "resource.proto",
|
||||
}
|
||||
@@ -1,105 +0,0 @@
|
||||
package resource_server_tests
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
|
||||
"github.com/grafana/grafana/pkg/components/satokengen"
|
||||
"github.com/grafana/grafana/pkg/infra/appcontext"
|
||||
"github.com/grafana/grafana/pkg/server"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/org"
|
||||
saAPI "github.com/grafana/grafana/pkg/services/serviceaccounts/api"
|
||||
saTests "github.com/grafana/grafana/pkg/services/serviceaccounts/tests"
|
||||
"github.com/grafana/grafana/pkg/services/store/entity/db/dbimpl"
|
||||
"github.com/grafana/grafana/pkg/services/store/resource"
|
||||
"github.com/grafana/grafana/pkg/services/store/resource/sqlstash"
|
||||
"github.com/grafana/grafana/pkg/services/user"
|
||||
"github.com/grafana/grafana/pkg/tests/testinfra"
|
||||
"github.com/grafana/grafana/pkg/tests/testsuite"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
testsuite.Run(m)
|
||||
}
|
||||
|
||||
func createServiceAccountAdminToken(t *testing.T, env *server.TestEnv) (string, *user.SignedInUser) {
|
||||
t.Helper()
|
||||
|
||||
account := saTests.SetupUserServiceAccount(t, env.SQLStore, env.Cfg, saTests.TestUser{
|
||||
Name: "grpc-server-sa",
|
||||
Role: string(org.RoleAdmin),
|
||||
Login: "grpc-server-sa",
|
||||
IsServiceAccount: true,
|
||||
})
|
||||
|
||||
keyGen, err := satokengen.New(saAPI.ServiceID)
|
||||
require.NoError(t, err)
|
||||
|
||||
_ = saTests.SetupApiKey(t, env.SQLStore, env.Cfg, saTests.TestApiKey{
|
||||
Name: "grpc-server-test",
|
||||
Role: org.RoleAdmin,
|
||||
OrgId: account.OrgID,
|
||||
Key: keyGen.HashedKey,
|
||||
ServiceAccountID: &account.ID,
|
||||
})
|
||||
|
||||
return keyGen.ClientSecret, &user.SignedInUser{
|
||||
UserID: account.ID,
|
||||
Email: account.Email,
|
||||
Name: account.Name,
|
||||
Login: account.Login,
|
||||
OrgID: account.OrgID,
|
||||
IsServiceAccount: account.IsServiceAccount,
|
||||
}
|
||||
}
|
||||
|
||||
type testContext struct {
|
||||
authToken string
|
||||
client resource.ResourceStoreClient
|
||||
user *user.SignedInUser
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func createTestContext(t *testing.T) testContext {
|
||||
t.Helper()
|
||||
|
||||
dir, path := testinfra.CreateGrafDir(t, testinfra.GrafanaOpts{
|
||||
EnableFeatureToggles: []string{
|
||||
featuremgmt.FlagGrpcServer,
|
||||
featuremgmt.FlagUnifiedStorage,
|
||||
},
|
||||
AppModeProduction: false, // required for migrations to run
|
||||
GRPCServerAddress: "127.0.0.1:0", // :0 for choosing the port automatically
|
||||
})
|
||||
|
||||
_, env := testinfra.StartGrafanaEnv(t, dir, path)
|
||||
|
||||
authToken, serviceAccountUser := createServiceAccountAdminToken(t, env)
|
||||
|
||||
eDB, err := dbimpl.ProvideEntityDB(env.SQLStore, env.Cfg, env.FeatureToggles, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = eDB.Init()
|
||||
require.NoError(t, err)
|
||||
|
||||
traceConfig, err := tracing.ParseTracingConfig(env.Cfg)
|
||||
require.NoError(t, err)
|
||||
tracer, err := tracing.ProvideService(traceConfig)
|
||||
require.NoError(t, err)
|
||||
store, err := sqlstash.ProvideSQLResourceServer(eDB, tracer)
|
||||
require.NoError(t, err)
|
||||
|
||||
client := resource.NewResourceStoreClientLocal(store)
|
||||
|
||||
return testContext{
|
||||
authToken: authToken,
|
||||
client: client,
|
||||
user: serviceAccountUser,
|
||||
ctx: appcontext.WithUser(context.Background(), serviceAccountUser),
|
||||
}
|
||||
}
|
||||
@@ -1,84 +0,0 @@
|
||||
package resource_server_tests
|
||||
|
||||
import (
|
||||
_ "embed"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apis/playlist/v0alpha1"
|
||||
"github.com/grafana/grafana/pkg/infra/appcontext"
|
||||
"github.com/grafana/grafana/pkg/services/store/resource"
|
||||
)
|
||||
|
||||
var (
|
||||
//go:embed testdata/dashboard-with-tags-b-g.json
|
||||
dashboardWithTagsBlueGreen string
|
||||
//go:embed testdata/dashboard-with-tags-r-g.json
|
||||
dashboardWithTagsRedGreen string
|
||||
)
|
||||
|
||||
func TestIntegrationEntityServer(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test")
|
||||
}
|
||||
|
||||
testCtx := createTestContext(t)
|
||||
ctx := appcontext.WithUser(testCtx.ctx, testCtx.user)
|
||||
|
||||
t.Run("should not retrieve non-existent objects", func(t *testing.T) {
|
||||
resp, err := testCtx.client.GetResource(ctx, &resource.GetResourceRequest{
|
||||
Key: &resource.Key{
|
||||
Group: "X",
|
||||
Namespace: "X",
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
require.NotNil(t, resp.Status)
|
||||
require.Nil(t, resp.Value)
|
||||
require.Equal(t, int32(http.StatusBadRequest), resp.Status.Code)
|
||||
})
|
||||
|
||||
t.Run("insert an object", func(t *testing.T) {
|
||||
var err error
|
||||
key := &resource.Key{
|
||||
Namespace: "default",
|
||||
Group: "playlists.grafana.app",
|
||||
Resource: "Playlist",
|
||||
Name: "x123",
|
||||
}
|
||||
sample := v0alpha1.Playlist{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: key.Name,
|
||||
Namespace: key.Namespace,
|
||||
UID: types.UID("xyz"),
|
||||
},
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: key.Resource,
|
||||
APIVersion: key.Group + "/v0alpha1",
|
||||
},
|
||||
Spec: v0alpha1.Spec{
|
||||
Title: "hello",
|
||||
},
|
||||
}
|
||||
req := &resource.CreateRequest{
|
||||
Key: key,
|
||||
}
|
||||
req.Value, err = json.Marshal(sample)
|
||||
require.NoError(t, err)
|
||||
|
||||
fmt.Printf("%s", string(req.Value))
|
||||
|
||||
resp, err := testCtx.client.Create(ctx, req)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
require.Nil(t, resp.Status)
|
||||
require.True(t, resp.ResourceVersion > 0) // that it has a positive resource version
|
||||
})
|
||||
}
|
||||
@@ -1,39 +0,0 @@
|
||||
{
|
||||
"tags": [
|
||||
"blue",
|
||||
"green"
|
||||
],
|
||||
"editable": true,
|
||||
"fiscalYearStartMonth": 0,
|
||||
"graphTooltip": 0,
|
||||
"id": 221,
|
||||
"links": [],
|
||||
"liveNow": false,
|
||||
"panels": [
|
||||
{
|
||||
"gridPos": {
|
||||
"h": 1,
|
||||
"w": 24,
|
||||
"x": 0,
|
||||
"y": 8
|
||||
},
|
||||
"id": 8,
|
||||
"title": "Row title",
|
||||
"type": "row"
|
||||
}
|
||||
],
|
||||
"schemaVersion": 36,
|
||||
"templating": {
|
||||
"list": []
|
||||
},
|
||||
"time": {
|
||||
"from": "now-6h",
|
||||
"to": "now"
|
||||
},
|
||||
"timepicker": {},
|
||||
"timezone": "",
|
||||
"title": "special ds",
|
||||
"uid": "mocpwtR4k",
|
||||
"version": 1,
|
||||
"weekStart": ""
|
||||
}
|
||||
@@ -1,39 +0,0 @@
|
||||
{
|
||||
"tags": [
|
||||
"red",
|
||||
"green"
|
||||
],
|
||||
"editable": true,
|
||||
"fiscalYearStartMonth": 0,
|
||||
"graphTooltip": 0,
|
||||
"id": 221,
|
||||
"links": [],
|
||||
"liveNow": false,
|
||||
"panels": [
|
||||
{
|
||||
"gridPos": {
|
||||
"h": 1,
|
||||
"w": 24,
|
||||
"x": 0,
|
||||
"y": 8
|
||||
},
|
||||
"id": 8,
|
||||
"title": "Row title",
|
||||
"type": "row"
|
||||
}
|
||||
],
|
||||
"schemaVersion": 36,
|
||||
"templating": {
|
||||
"list": []
|
||||
},
|
||||
"time": {
|
||||
"from": "now-6h",
|
||||
"to": "now"
|
||||
},
|
||||
"timepicker": {},
|
||||
"timezone": "",
|
||||
"title": "special ds",
|
||||
"uid": "mocpwtR4k",
|
||||
"version": 1,
|
||||
"weekStart": ""
|
||||
}
|
||||
@@ -1,36 +0,0 @@
|
||||
package resource
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
)
|
||||
|
||||
// ObjectKey creates a key for a given object
|
||||
func ObjectKey(gr schema.GroupResource, obj metav1.Object) (*Key, error) {
|
||||
if gr.Group == "" {
|
||||
return nil, fmt.Errorf("missing group")
|
||||
}
|
||||
if gr.Resource == "" {
|
||||
return nil, fmt.Errorf("missing resource")
|
||||
}
|
||||
if obj.GetName() == "" {
|
||||
return nil, fmt.Errorf("object is missing name")
|
||||
}
|
||||
key := &Key{
|
||||
Group: gr.Group,
|
||||
Resource: gr.Resource,
|
||||
Namespace: obj.GetNamespace(),
|
||||
Name: obj.GetName(),
|
||||
}
|
||||
if obj.GetResourceVersion() != "" {
|
||||
var err error
|
||||
key.ResourceVersion, err = strconv.ParseInt(obj.GetResourceVersion(), 10, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("storage requires numeric revision version %w", err)
|
||||
}
|
||||
}
|
||||
return key, nil
|
||||
}
|
||||
@@ -1,7 +0,0 @@
|
||||
version: v1
|
||||
breaking:
|
||||
use:
|
||||
- FILE
|
||||
lint:
|
||||
use:
|
||||
- DEFAULT
|
||||
25
pkg/storage/api/testdata/01_create_playlist.json
vendored
25
pkg/storage/api/testdata/01_create_playlist.json
vendored
@@ -1,25 +0,0 @@
|
||||
{
|
||||
"apiVersion": "playlist.grafana.app/v0alpha1",
|
||||
"kind": "Playlist",
|
||||
"metadata": {
|
||||
"name": "fdgsv37qslr0ga",
|
||||
"namespace": "default",
|
||||
"annotations": {
|
||||
"grafana.app/originName": "elsewhere",
|
||||
"grafana.app/originPath": "path/to/item",
|
||||
"grafana.app/originTimestamp": "2024-02-02T00:00:00Z"
|
||||
},
|
||||
"creationTimestamp": "2024-03-03T00:00:00Z",
|
||||
"uid": "8tGrXJgGbFI0"
|
||||
},
|
||||
"spec": {
|
||||
"title": "hello",
|
||||
"interval": "5m",
|
||||
"items": [
|
||||
{
|
||||
"type": "dashboard_by_uid",
|
||||
"value": "vmie2cmWz"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
@@ -8,29 +8,29 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
||||
playlist "github.com/grafana/grafana/pkg/apis/playlist/v0alpha1"
|
||||
"github.com/grafana/grafana/pkg/infra/appcontext"
|
||||
"github.com/grafana/grafana/pkg/models/roletype"
|
||||
"github.com/grafana/grafana/pkg/services/store/resource"
|
||||
"github.com/grafana/grafana/pkg/services/user"
|
||||
"github.com/grafana/grafana/pkg/storage/unified"
|
||||
)
|
||||
|
||||
func TestSQLCommands(t *testing.T) {
|
||||
ctx := appcontext.WithUser(context.Background(), &user.SignedInUser{
|
||||
UserID: 123,
|
||||
UserUID: "u123",
|
||||
OrgRole: roletype.RoleAdmin,
|
||||
OrgRole: identity.RoleAdmin,
|
||||
})
|
||||
validator := resource.NewEventValidator(resource.EventValidatorOptions{
|
||||
validator := unified.NewEventValidator(unified.EventValidatorOptions{
|
||||
// no folders for now
|
||||
})
|
||||
|
||||
t.Run("insert playlist SQL", func(t *testing.T) {
|
||||
input := testdataFromJSON(t, "01_create_playlist.json", &playlist.Playlist{})
|
||||
key, err := resource.ObjectKey(playlist.PlaylistResourceInfo.GroupResource(), input)
|
||||
key, err := unified.ResourceKeyFor(playlist.PlaylistResourceInfo.GroupResource(), input)
|
||||
require.NoError(t, err)
|
||||
|
||||
req := &resource.CreateRequest{Key: key, Message: "test commit"}
|
||||
req := &unified.CreateRequest{Key: key, Message: "test commit"}
|
||||
req.Value, err = json.Marshal(input)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "default/playlist.grafana.app/playlists/fdgsv37qslr0ga", key.NamespacedPath())
|
||||
@@ -18,7 +18,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/store/entity/db"
|
||||
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash"
|
||||
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
|
||||
"github.com/grafana/grafana/pkg/services/store/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified"
|
||||
)
|
||||
|
||||
const resoruceTable = "resource"
|
||||
@@ -35,7 +35,7 @@ var (
|
||||
)
|
||||
|
||||
// Make sure we implement correct interfaces
|
||||
var _ resource.ResourceStoreServer = &sqlResourceServer{}
|
||||
var _ unified.ResourceStoreServer = &sqlResourceServer{}
|
||||
|
||||
func ProvideSQLResourceServer(db db.EntityDBInterface, tracer tracing.Tracer) (SqlResourceServer, error) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@@ -56,7 +56,7 @@ func ProvideSQLResourceServer(db db.EntityDBInterface, tracer tracing.Tracer) (S
|
||||
}
|
||||
|
||||
type SqlResourceServer interface {
|
||||
resource.ResourceStoreServer
|
||||
unified.ResourceStoreServer
|
||||
|
||||
Init() error
|
||||
Stop()
|
||||
@@ -67,12 +67,12 @@ type sqlResourceServer struct {
|
||||
db db.EntityDBInterface // needed to keep xorm engine in scope
|
||||
sess *session.SessionDB
|
||||
dialect migrator.Dialect
|
||||
broadcaster sqlstash.Broadcaster[*resource.WatchResponse]
|
||||
broadcaster sqlstash.Broadcaster[*unified.WatchResponse]
|
||||
ctx context.Context // TODO: remove
|
||||
cancel context.CancelFunc
|
||||
stream chan *resource.WatchResponse
|
||||
stream chan *unified.WatchResponse
|
||||
tracer trace.Tracer
|
||||
validator resource.EventValidator
|
||||
validator unified.EventValidator
|
||||
|
||||
once sync.Once
|
||||
initErr error
|
||||
@@ -138,12 +138,12 @@ func (s *sqlResourceServer) init() error {
|
||||
|
||||
s.sess = sess
|
||||
s.dialect = migrator.NewDialect(engine.DriverName())
|
||||
s.validator = resource.NewEventValidator(resource.EventValidatorOptions{
|
||||
s.validator = unified.NewEventValidator(unified.EventValidatorOptions{
|
||||
// use snowflake IDs
|
||||
})
|
||||
|
||||
// set up the broadcaster
|
||||
s.broadcaster, err = sqlstash.NewBroadcaster(s.ctx, func(stream chan *resource.WatchResponse) error {
|
||||
s.broadcaster, err = sqlstash.NewBroadcaster(s.ctx, func(stream chan *unified.WatchResponse) error {
|
||||
s.stream = stream
|
||||
|
||||
// start the poller
|
||||
@@ -158,7 +158,7 @@ func (s *sqlResourceServer) init() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *sqlResourceServer) IsHealthy(ctx context.Context, r *resource.HealthCheckRequest) (*resource.HealthCheckResponse, error) {
|
||||
func (s *sqlResourceServer) IsHealthy(ctx context.Context, r *unified.HealthCheckRequest) (*unified.HealthCheckResponse, error) {
|
||||
ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "isHealthy"}))
|
||||
if err := s.Init(); err != nil {
|
||||
ctxLogger.Error("init error", "error", err)
|
||||
@@ -169,14 +169,14 @@ func (s *sqlResourceServer) IsHealthy(ctx context.Context, r *resource.HealthChe
|
||||
return nil, err
|
||||
}
|
||||
// TODO: check the status of the watcher implementation as well
|
||||
return &resource.HealthCheckResponse{Status: resource.HealthCheckResponse_SERVING}, nil
|
||||
return &unified.HealthCheckResponse{Status: unified.HealthCheckResponse_SERVING}, nil
|
||||
}
|
||||
|
||||
func (s *sqlResourceServer) Stop() {
|
||||
s.cancel()
|
||||
}
|
||||
|
||||
func (s *sqlResourceServer) GetResource(ctx context.Context, req *resource.GetResourceRequest) (*resource.GetResourceResponse, error) {
|
||||
func (s *sqlResourceServer) GetResource(ctx context.Context, req *unified.GetResourceRequest) (*unified.GetResourceResponse, error) {
|
||||
ctx, span := s.tracer.Start(ctx, "storage_server.GetResource")
|
||||
defer span.End()
|
||||
|
||||
@@ -185,10 +185,10 @@ func (s *sqlResourceServer) GetResource(ctx context.Context, req *resource.GetRe
|
||||
}
|
||||
|
||||
if req.Key.Group == "" {
|
||||
return &resource.GetResourceResponse{Status: badRequest("missing group")}, nil
|
||||
return &unified.GetResourceResponse{Status: badRequest("missing group")}, nil
|
||||
}
|
||||
if req.Key.Resource == "" {
|
||||
return &resource.GetResourceResponse{Status: badRequest("missing resource")}, nil
|
||||
return &unified.GetResourceResponse{Status: badRequest("missing resource")}, nil
|
||||
}
|
||||
|
||||
fmt.Printf("TODO, GET: %+v", req.Key)
|
||||
@@ -196,12 +196,12 @@ func (s *sqlResourceServer) GetResource(ctx context.Context, req *resource.GetRe
|
||||
return nil, ErrNotImplementedYet
|
||||
}
|
||||
|
||||
func (s *sqlResourceServer) Create(ctx context.Context, req *resource.CreateRequest) (*resource.CreateResponse, error) {
|
||||
func (s *sqlResourceServer) Create(ctx context.Context, req *unified.CreateRequest) (*unified.CreateResponse, error) {
|
||||
ctx, span := s.tracer.Start(ctx, "storage_server.Create")
|
||||
defer span.End()
|
||||
|
||||
if req.Key.ResourceVersion > 0 {
|
||||
return &resource.CreateResponse{
|
||||
return &unified.CreateResponse{
|
||||
Status: badRequest("can not update a specific resource version"),
|
||||
}, nil
|
||||
}
|
||||
@@ -215,7 +215,7 @@ func (s *sqlResourceServer) Create(ctx context.Context, req *resource.CreateRequ
|
||||
return nil, err
|
||||
}
|
||||
if event.Status != nil {
|
||||
return &resource.CreateResponse{Status: event.Status}, nil
|
||||
return &unified.CreateResponse{Status: event.Status}, nil
|
||||
}
|
||||
|
||||
fmt.Printf("TODO, CREATE: %v", event)
|
||||
@@ -223,12 +223,12 @@ func (s *sqlResourceServer) Create(ctx context.Context, req *resource.CreateRequ
|
||||
return nil, ErrNotImplementedYet
|
||||
}
|
||||
|
||||
func (s *sqlResourceServer) Update(ctx context.Context, req *resource.UpdateRequest) (*resource.UpdateResponse, error) {
|
||||
func (s *sqlResourceServer) Update(ctx context.Context, req *unified.UpdateRequest) (*unified.UpdateResponse, error) {
|
||||
ctx, span := s.tracer.Start(ctx, "storage_server.Update")
|
||||
defer span.End()
|
||||
|
||||
if req.Key.ResourceVersion < 0 {
|
||||
return &resource.UpdateResponse{
|
||||
return &unified.UpdateResponse{
|
||||
Status: badRequest("update must include the previous version"),
|
||||
}, nil
|
||||
}
|
||||
@@ -237,29 +237,18 @@ func (s *sqlResourceServer) Update(ctx context.Context, req *resource.UpdateRequ
|
||||
return nil, err
|
||||
}
|
||||
|
||||
latest, err := s.GetResource(ctx, &resource.GetResourceRequest{
|
||||
latest, err := s.GetResource(ctx, &unified.GetResourceRequest{
|
||||
Key: req.Key.WithoutResourceVersion(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if latest.Value == nil {
|
||||
return &resource.UpdateResponse{
|
||||
Status: badRequest("existing value does not exist"),
|
||||
}, nil
|
||||
}
|
||||
if latest.ResourceVersion != req.Key.ResourceVersion {
|
||||
return &resource.UpdateResponse{
|
||||
Status: badRequest("not the latest resource version"),
|
||||
}, nil
|
||||
}
|
||||
|
||||
event, err := s.validator.PrepareUpdate(ctx, req, latest.Value)
|
||||
event, err := s.validator.PrepareUpdate(ctx, req, latest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if event.Status != nil {
|
||||
return &resource.UpdateResponse{Status: event.Status}, nil
|
||||
return &unified.UpdateResponse{Status: event.Status}, nil
|
||||
}
|
||||
|
||||
fmt.Printf("TODO, UPDATE: %v", event)
|
||||
@@ -267,20 +256,40 @@ func (s *sqlResourceServer) Update(ctx context.Context, req *resource.UpdateRequ
|
||||
return nil, ErrNotImplementedYet
|
||||
}
|
||||
|
||||
func (s *sqlResourceServer) Delete(ctx context.Context, req *resource.DeleteRequest) (*resource.DeleteResponse, error) {
|
||||
func (s *sqlResourceServer) Delete(ctx context.Context, req *unified.DeleteRequest) (*unified.DeleteResponse, error) {
|
||||
ctx, span := s.tracer.Start(ctx, "storage_server.Delete")
|
||||
defer span.End()
|
||||
|
||||
if req.Key.ResourceVersion < 0 {
|
||||
return &unified.DeleteResponse{
|
||||
Status: badRequest("update must include the previous version"),
|
||||
}, nil
|
||||
}
|
||||
|
||||
if err := s.Init(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fmt.Printf("TODO, DELETE: %+v // %s", req.Key, ctx.Value("X"))
|
||||
latest, err := s.GetResource(ctx, &unified.GetResourceRequest{
|
||||
Key: req.Key.WithoutResourceVersion(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
event, err := s.validator.PrepareDelete(ctx, req, latest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if event.Status != nil {
|
||||
return &unified.DeleteResponse{Status: event.Status}, nil
|
||||
}
|
||||
|
||||
fmt.Printf("TODO, DELETE: %+v ", req.Key)
|
||||
|
||||
return nil, ErrNotImplementedYet
|
||||
}
|
||||
|
||||
func (s *sqlResourceServer) List(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) {
|
||||
func (s *sqlResourceServer) List(ctx context.Context, req *unified.ListRequest) (*unified.ListResponse, error) {
|
||||
ctx, span := s.tracer.Start(ctx, "storage_server.List")
|
||||
defer span.End()
|
||||
|
||||
@@ -315,7 +324,7 @@ func (s *sqlResourceServer) List(ctx context.Context, req *resource.ListRequest)
|
||||
}
|
||||
|
||||
// Get the raw blob bytes and metadata
|
||||
func (s *sqlResourceServer) GetBlob(ctx context.Context, req *resource.GetBlobRequest) (*resource.GetBlobResponse, error) {
|
||||
func (s *sqlResourceServer) GetBlob(ctx context.Context, req *unified.GetBlobRequest) (*unified.GetBlobResponse, error) {
|
||||
ctx, span := s.tracer.Start(ctx, "storage_server.List")
|
||||
defer span.End()
|
||||
|
||||
@@ -329,7 +338,7 @@ func (s *sqlResourceServer) GetBlob(ctx context.Context, req *resource.GetBlobRe
|
||||
}
|
||||
|
||||
// Show resource history (and trash)
|
||||
func (s *sqlResourceServer) History(ctx context.Context, req *resource.HistoryRequest) (*resource.HistoryResponse, error) {
|
||||
func (s *sqlResourceServer) History(ctx context.Context, req *unified.HistoryRequest) (*unified.HistoryResponse, error) {
|
||||
ctx, span := s.tracer.Start(ctx, "storage_server.History")
|
||||
defer span.End()
|
||||
|
||||
@@ -343,7 +352,7 @@ func (s *sqlResourceServer) History(ctx context.Context, req *resource.HistoryRe
|
||||
}
|
||||
|
||||
// Used for efficient provisioning
|
||||
func (s *sqlResourceServer) Origin(ctx context.Context, req *resource.OriginRequest) (*resource.OriginResponse, error) {
|
||||
func (s *sqlResourceServer) Origin(ctx context.Context, req *unified.OriginRequest) (*unified.OriginResponse, error) {
|
||||
ctx, span := s.tracer.Start(ctx, "storage_server.History")
|
||||
defer span.End()
|
||||
|
||||
@@ -9,11 +9,11 @@ import (
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/store/entity/db"
|
||||
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
|
||||
"github.com/grafana/grafana/pkg/services/store/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified"
|
||||
)
|
||||
|
||||
func badRequest(msg string) *resource.StatusResult {
|
||||
return &resource.StatusResult{
|
||||
func badRequest(msg string) *unified.StatusResult {
|
||||
return &unified.StatusResult{
|
||||
Status: "Failure",
|
||||
Message: msg,
|
||||
Code: http.StatusBadRequest,
|
||||
@@ -4,14 +4,14 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/store/resource"
|
||||
"github.com/grafana/grafana/pkg/storage/unified"
|
||||
)
|
||||
|
||||
func (s *sqlResourceServer) Watch(*resource.WatchRequest, resource.ResourceStore_WatchServer) error {
|
||||
func (s *sqlResourceServer) Watch(*unified.WatchRequest, unified.ResourceStore_WatchServer) error {
|
||||
return ErrNotImplementedYet
|
||||
}
|
||||
|
||||
func (s *sqlResourceServer) poller(stream chan *resource.WatchResponse) {
|
||||
func (s *sqlResourceServer) poller(stream chan *unified.WatchResponse) {
|
||||
var err error
|
||||
|
||||
since := int64(0)
|
||||
@@ -34,7 +34,7 @@ func (s *sqlResourceServer) poller(stream chan *resource.WatchResponse) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *sqlResourceServer) poll(since int64, out chan *resource.WatchResponse) (int64, error) {
|
||||
func (s *sqlResourceServer) poll(since int64, out chan *unified.WatchResponse) (int64, error) {
|
||||
ctx, span := s.tracer.Start(s.ctx, "storage_server.poll")
|
||||
defer span.End()
|
||||
ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "poll"}))
|
||||
@@ -43,7 +43,7 @@ func (s *sqlResourceServer) poll(since int64, out chan *resource.WatchResponse)
|
||||
err := func() error {
|
||||
if false {
|
||||
// TODO
|
||||
out <- &resource.WatchResponse{}
|
||||
out <- &unified.WatchResponse{}
|
||||
}
|
||||
|
||||
// TODO, copy from entity store
|
||||
@@ -1,10 +1,10 @@
|
||||
version: v1
|
||||
plugins:
|
||||
- plugin: go
|
||||
out: pkg/storage/api
|
||||
out: pkg/storage/unified
|
||||
opt: paths=source_relative
|
||||
- plugin: go-grpc
|
||||
out: pkg/storage/api
|
||||
out: pkg/storage/unified
|
||||
opt:
|
||||
- paths=source_relative
|
||||
- require_unimplemented_servers=false
|
||||
30
pkg/storage/unified/client_wrapper.go
Normal file
30
pkg/storage/unified/client_wrapper.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package unified
|
||||
|
||||
import (
|
||||
"github.com/fullstorydev/grpchan"
|
||||
"github.com/fullstorydev/grpchan/inprocgrpc"
|
||||
grpcAuth "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
grpcUtils "github.com/grafana/grafana/pkg/services/store/entity/grpc"
|
||||
)
|
||||
|
||||
func NewResourceStoreClientLocal(server ResourceStoreServer) ResourceStoreClient {
|
||||
channel := &inprocgrpc.Channel{}
|
||||
|
||||
auth := &grpcUtils.Authenticator{}
|
||||
|
||||
channel.RegisterService(
|
||||
grpchan.InterceptServer(
|
||||
&ResourceStore_ServiceDesc,
|
||||
grpcAuth.UnaryServerInterceptor(auth.Authenticate),
|
||||
grpcAuth.StreamServerInterceptor(auth.Authenticate),
|
||||
),
|
||||
server,
|
||||
)
|
||||
return NewResourceStoreClient(grpchan.InterceptClientConn(channel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor))
|
||||
}
|
||||
|
||||
func NewEntityStoreClientGRPC(channel *grpc.ClientConn) ResourceStoreClient {
|
||||
return NewResourceStoreClient(grpchan.InterceptClientConn(channel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor))
|
||||
}
|
||||
37
pkg/storage/unified/deleted.go
Normal file
37
pkg/storage/unified/deleted.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package unified
|
||||
|
||||
import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
)
|
||||
|
||||
// This object is written when an object is deleted
|
||||
type DeletedMarker struct {
|
||||
metav1.TypeMeta `json:",inline"`
|
||||
metav1.ObjectMeta `json:"metadata,omitempty"`
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *DeletedMarker) DeepCopyInto(out *DeletedMarker) {
|
||||
*out = *in
|
||||
out.TypeMeta = in.TypeMeta
|
||||
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeletedMarker.
|
||||
func (in *DeletedMarker) DeepCopy() *DeletedMarker {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(DeletedMarker)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
|
||||
func (in *DeletedMarker) DeepCopyObject() runtime.Object {
|
||||
if c := in.DeepCopy(); c != nil {
|
||||
return c
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -1,11 +1,12 @@
|
||||
package api
|
||||
package unified
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
"github.com/grafana/grafana/pkg/services/auth/identity"
|
||||
)
|
||||
|
||||
type WriteEvent struct {
|
||||
@@ -14,7 +15,10 @@ type WriteEvent struct {
|
||||
Requester identity.Requester
|
||||
Operation ResourceOperation
|
||||
PreviousRV int64 // only for Update+Delete
|
||||
Value []byte
|
||||
|
||||
// The raw JSON payload
|
||||
// NOTE, this is never mutated, only parsed and validated
|
||||
Value json.RawMessage
|
||||
|
||||
Object utils.GrafanaMetaAccessor
|
||||
OldObject utils.GrafanaMetaAccessor
|
||||
@@ -1,8 +1,10 @@
|
||||
module github.com/grafana/grafana/pkg/storage/api
|
||||
module github.com/grafana/grafana/pkg/storage/unified
|
||||
|
||||
go 1.21.10
|
||||
|
||||
require (
|
||||
require (
|
||||
github.com/bwmarrin/snowflake v0.3.0
|
||||
github.com/grafana/grafana/pkg/apimachinery v0.0.0-20240409140820-518d3341d58f
|
||||
github.com/fullstorydev/grpchan v1.1.1
|
||||
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0
|
||||
github.com/stretchr/testify v1.9.0
|
||||
@@ -1,8 +1,12 @@
|
||||
package api
|
||||
package unified
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
)
|
||||
|
||||
// NamespacedPath is a path that can be used to isolate tenant data
|
||||
@@ -48,3 +52,20 @@ func (x *ResourceKey) WithoutResourceVersion() *ResourceKey {
|
||||
Name: x.Name,
|
||||
}
|
||||
}
|
||||
|
||||
func ResourceKeyFor(gr schema.GroupResource, obj metav1.Object) (*ResourceKey, error) {
|
||||
key := &ResourceKey{
|
||||
Group: gr.Group,
|
||||
Resource: gr.Resource,
|
||||
Namespace: obj.GetNamespace(),
|
||||
Name: obj.GetName(),
|
||||
}
|
||||
rv := obj.GetResourceVersion()
|
||||
if rv != "" {
|
||||
var err error
|
||||
key.ResourceVersion, err = strconv.ParseInt(rv, 10, 64)
|
||||
return key, err
|
||||
}
|
||||
return key, nil
|
||||
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,7 +1,7 @@
|
||||
syntax = "proto3";
|
||||
package api;
|
||||
package unified;
|
||||
|
||||
option go_package = "github.com/grafana/grafana/pkg/storage/api";
|
||||
option go_package = "github.com/grafana/grafana/pkg/storage/unified";
|
||||
|
||||
message ResourceKey {
|
||||
// Namespace (tenant)
|
||||
@@ -65,14 +65,17 @@ message ResourceMeta {
|
||||
|
||||
// Basic blob metadata
|
||||
message BlobInfo {
|
||||
// Content Length
|
||||
int64 size = 1;
|
||||
// System identifier
|
||||
string path = 1;
|
||||
|
||||
// MD5 digest of the body
|
||||
string ETag = 2;
|
||||
// Content Length
|
||||
int64 size = 2;
|
||||
|
||||
// Content type header
|
||||
string content_type = 3;
|
||||
|
||||
// Content hash used for an etag
|
||||
string hash = 4;
|
||||
}
|
||||
|
||||
// Status structure is copied from:
|
||||
@@ -97,8 +100,18 @@ message StatusResult {
|
||||
int32 code = 4;
|
||||
}
|
||||
|
||||
// TODO? support PresignedUrls for upload?
|
||||
message CreateBlob {
|
||||
message LinkBlob {
|
||||
enum Action {
|
||||
UNKNOWN = 0;
|
||||
// Upload raw bytes
|
||||
UPLOAD = 1;
|
||||
// Keep the existing blob (valid for updates)
|
||||
KEEP = 2;
|
||||
// Do not keep the existing version (same as not sending LinkBlob, only valid for updates)
|
||||
REMOVE = 3;
|
||||
// TODO... support presigned uploads
|
||||
}
|
||||
|
||||
// Content type header
|
||||
string content_type = 1;
|
||||
|
||||
@@ -123,7 +136,7 @@ message CreateRequest {
|
||||
string message = 3;
|
||||
|
||||
// Optionally include a large binary object
|
||||
CreateBlob blob = 4;
|
||||
LinkBlob blob = 4;
|
||||
}
|
||||
|
||||
message CreateResponse {
|
||||
@@ -146,7 +159,7 @@ message UpdateRequest {
|
||||
string message = 3;
|
||||
|
||||
// Optionally link a resource object
|
||||
CreateBlob blob = 4;
|
||||
LinkBlob blob = 4;
|
||||
}
|
||||
|
||||
message UpdateResponse {
|
||||
@@ -175,6 +188,9 @@ message DeleteResponse {
|
||||
|
||||
message GetResourceRequest {
|
||||
ResourceKey key = 1;
|
||||
|
||||
// Do not include any blob details
|
||||
bool ignore_blob = 2;
|
||||
}
|
||||
|
||||
message GetResourceResponse {
|
||||
@@ -200,7 +216,7 @@ message GetBlobResponse {
|
||||
// Status code
|
||||
StatusResult status = 1;
|
||||
|
||||
// Headers
|
||||
// Headers
|
||||
BlobInfo info = 2;
|
||||
|
||||
// The raw object value
|
||||
@@ -4,7 +4,7 @@
|
||||
// - protoc (unknown)
|
||||
// source: resource.proto
|
||||
|
||||
package api
|
||||
package unified
|
||||
|
||||
import (
|
||||
context "context"
|
||||
@@ -19,16 +19,16 @@ import (
|
||||
const _ = grpc.SupportPackageIsVersion8
|
||||
|
||||
const (
|
||||
ResourceStore_GetResource_FullMethodName = "/api.ResourceStore/GetResource"
|
||||
ResourceStore_Create_FullMethodName = "/api.ResourceStore/Create"
|
||||
ResourceStore_Update_FullMethodName = "/api.ResourceStore/Update"
|
||||
ResourceStore_Delete_FullMethodName = "/api.ResourceStore/Delete"
|
||||
ResourceStore_List_FullMethodName = "/api.ResourceStore/List"
|
||||
ResourceStore_Watch_FullMethodName = "/api.ResourceStore/Watch"
|
||||
ResourceStore_GetBlob_FullMethodName = "/api.ResourceStore/GetBlob"
|
||||
ResourceStore_History_FullMethodName = "/api.ResourceStore/History"
|
||||
ResourceStore_Origin_FullMethodName = "/api.ResourceStore/Origin"
|
||||
ResourceStore_IsHealthy_FullMethodName = "/api.ResourceStore/IsHealthy"
|
||||
ResourceStore_GetResource_FullMethodName = "/unified.ResourceStore/GetResource"
|
||||
ResourceStore_Create_FullMethodName = "/unified.ResourceStore/Create"
|
||||
ResourceStore_Update_FullMethodName = "/unified.ResourceStore/Update"
|
||||
ResourceStore_Delete_FullMethodName = "/unified.ResourceStore/Delete"
|
||||
ResourceStore_List_FullMethodName = "/unified.ResourceStore/List"
|
||||
ResourceStore_Watch_FullMethodName = "/unified.ResourceStore/Watch"
|
||||
ResourceStore_GetBlob_FullMethodName = "/unified.ResourceStore/GetBlob"
|
||||
ResourceStore_History_FullMethodName = "/unified.ResourceStore/History"
|
||||
ResourceStore_Origin_FullMethodName = "/unified.ResourceStore/Origin"
|
||||
ResourceStore_IsHealthy_FullMethodName = "/unified.ResourceStore/IsHealthy"
|
||||
)
|
||||
|
||||
// ResourceStoreClient is the client API for ResourceStore service.
|
||||
@@ -439,7 +439,7 @@ func _ResourceStore_IsHealthy_Handler(srv interface{}, ctx context.Context, dec
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
// and not to be introspected or modified (even as a copy)
|
||||
var ResourceStore_ServiceDesc = grpc.ServiceDesc{
|
||||
ServiceName: "api.ResourceStore",
|
||||
ServiceName: "unified.ResourceStore",
|
||||
HandlerType: (*ResourceStoreServer)(nil),
|
||||
Methods: []grpc.MethodDesc{
|
||||
{
|
||||
@@ -1,16 +1,16 @@
|
||||
package api_test
|
||||
package unified_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/storage/api"
|
||||
"github.com/grafana/grafana/pkg/storage/unified"
|
||||
)
|
||||
|
||||
func TestResourceModels(t *testing.T) {
|
||||
t.Run("key namespaced path", func(t *testing.T) {
|
||||
key := &api.ResourceKey{}
|
||||
key := &unified.ResourceKey{}
|
||||
require.Equal(t, "__cluster__", key.NamespacedPath())
|
||||
|
||||
key.Namespace = "ns"
|
||||
@@ -1,26 +1,33 @@
|
||||
package api
|
||||
package unified
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
"github.com/bwmarrin/snowflake"
|
||||
|
||||
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
||||
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
||||
"github.com/grafana/grafana/pkg/infra/appcontext"
|
||||
"github.com/grafana/grafana/pkg/services/auth/identity"
|
||||
)
|
||||
|
||||
// Verify that all required fields are set, and the user has permission to set the common metadata fields
|
||||
type EventValidator interface {
|
||||
PrepareCreate(ctx context.Context, req *CreateRequest) (*WriteEvent, error)
|
||||
PrepareUpdate(ctx context.Context, req *UpdateRequest, current []byte) (*WriteEvent, error)
|
||||
PrepareUpdate(ctx context.Context, req *UpdateRequest, current *GetResourceResponse) (*WriteEvent, error)
|
||||
PrepareDelete(ctx context.Context, req *DeleteRequest, current *GetResourceResponse) (*WriteEvent, error)
|
||||
}
|
||||
|
||||
type EventValidatorOptions struct {
|
||||
// Get the next EventID
|
||||
// When running in a cluster, each node should have a different ID
|
||||
// This is used for snowflake generation and log identification
|
||||
NodeID int64
|
||||
|
||||
// Get the next EventID. When not set, this will be a snowflake ID
|
||||
NextEventID func() int64
|
||||
|
||||
// Check if a user has access to write folders
|
||||
@@ -37,9 +44,16 @@ type eventValidator struct {
|
||||
|
||||
func NewEventValidator(opts EventValidatorOptions) EventValidator {
|
||||
if opts.NextEventID == nil {
|
||||
counter := atomic.Int64{}
|
||||
opts.NextEventID = func() int64 {
|
||||
return counter.Add(1)
|
||||
rvGenerationNode, err := snowflake.NewNode(opts.NodeID)
|
||||
if err == nil {
|
||||
opts.NextEventID = func() int64 {
|
||||
return rvGenerationNode.Generate().Int64()
|
||||
}
|
||||
} else {
|
||||
counter := atomic.Int64{}
|
||||
opts.NextEventID = func() int64 {
|
||||
return counter.Add(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
return &eventValidator{opts}
|
||||
@@ -59,7 +73,7 @@ func (v *eventValidator) newEvent(ctx context.Context, key *ResourceKey, value,
|
||||
Key: key,
|
||||
Value: value,
|
||||
}
|
||||
event.Requester, err = appcontext.User(ctx)
|
||||
event.Requester, err = identity.GetRequester(ctx)
|
||||
if err != nil {
|
||||
return event.BadRequest(err, "unable to get user")
|
||||
}
|
||||
@@ -178,9 +192,13 @@ func (v *eventValidator) PrepareCreate(ctx context.Context, req *CreateRequest)
|
||||
return event, nil
|
||||
}
|
||||
|
||||
func (v *eventValidator) PrepareUpdate(ctx context.Context, req *UpdateRequest, current []byte) (*WriteEvent, error) {
|
||||
event := v.newEvent(ctx, req.Key, req.Value, current)
|
||||
func (v *eventValidator) PrepareUpdate(ctx context.Context, req *UpdateRequest, current *GetResourceResponse) (*WriteEvent, error) {
|
||||
event := v.newEvent(ctx, req.Key, req.Value, current.Value)
|
||||
event.Operation = ResourceOperation_UPDATED
|
||||
event.PreviousRV = current.ResourceVersion
|
||||
if current.Value == nil {
|
||||
return event.BadRequest(nil, "current value does not exist"), nil
|
||||
}
|
||||
if event.Status != nil {
|
||||
return event, nil
|
||||
}
|
||||
@@ -189,8 +207,50 @@ func (v *eventValidator) PrepareUpdate(ctx context.Context, req *UpdateRequest,
|
||||
//----------------------------------------
|
||||
val := event.Object.GetUpdatedBy()
|
||||
if val != "" && val != event.Requester.GetUID().String() {
|
||||
return event.BadRequest(nil, "created by annotation does not match: metadata.annotations#"+utils.AnnoKeyUpdatedBy), nil
|
||||
return event.BadRequest(nil, "updated by annotation does not match: metadata.annotations#"+utils.AnnoKeyUpdatedBy), nil
|
||||
}
|
||||
|
||||
return event, nil
|
||||
}
|
||||
|
||||
func (v *eventValidator) PrepareDelete(ctx context.Context, req *DeleteRequest, current *GetResourceResponse) (*WriteEvent, error) {
|
||||
now := metav1.NewTime(time.Now())
|
||||
var err error
|
||||
event := &WriteEvent{
|
||||
EventID: v.opts.NextEventID(),
|
||||
Key: req.Key,
|
||||
Operation: ResourceOperation_DELETED,
|
||||
PreviousRV: current.ResourceVersion,
|
||||
}
|
||||
if event.PreviousRV != req.Key.ResourceVersion {
|
||||
return event.BadRequest(err, "deletion request does not match current revision (%d != %d)", req.Key.ResourceVersion, event.PreviousRV), nil
|
||||
}
|
||||
event.Requester, err = identity.GetRequester(ctx)
|
||||
if err != nil {
|
||||
return event.BadRequest(err, "unable to get user"), nil
|
||||
}
|
||||
marker := &DeletedMarker{}
|
||||
err = json.Unmarshal(current.Value, marker)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to read previous object, %w", err)
|
||||
}
|
||||
event.Object, err = utils.MetaAccessor(marker)
|
||||
if err != nil {
|
||||
return event.BadRequest(err, "unable to read marker object"), nil
|
||||
}
|
||||
event.Object.SetDeletionTimestamp(&now)
|
||||
event.Object.SetUpdatedTimestamp(&now.Time)
|
||||
event.Object.SetManagedFields(nil)
|
||||
event.Object.SetFinalizers(nil)
|
||||
event.Object.SetUpdatedBy(event.Requester.GetUID().String())
|
||||
marker.TypeMeta = metav1.TypeMeta{
|
||||
Kind: "DeletedMarker",
|
||||
APIVersion: "storage.grafana.app/v0alpha1", // ?? or can we stick this in common?
|
||||
}
|
||||
marker.Annotations["RestoreResourceVersion"] = fmt.Sprintf("%d", event.PreviousRV)
|
||||
event.Value, err = json.Marshal(marker)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable creating deletion marker, %w", err)
|
||||
}
|
||||
return event, nil
|
||||
}
|
||||
Reference in New Issue
Block a user