basic queries

This commit is contained in:
Ryan McKinley 2024-06-11 23:01:40 +03:00
parent 2e811c5438
commit ac39953d0e
23 changed files with 5955 additions and 2 deletions

View File

@ -259,6 +259,11 @@ func (s *service) start(ctx context.Context) error {
return err
}
err = eDB.Init()
if err != nil {
return err
}
storeServer, err := sqlstash.ProvideSQLEntityServer(eDB, s.tracing)
if err != nil {
return err

View File

@ -7,6 +7,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
// Annotation keys
@ -46,6 +47,9 @@ type ResourceOriginInfo struct {
// Accessor functions for k8s objects
type GrafanaResourceMetaAccessor interface {
metav1.Object
metav1.Type
GetUpdatedTimestamp() (*time.Time, error)
SetUpdatedTimestamp(v *time.Time)
SetUpdatedTimestampMillis(unix int64)
@ -79,6 +83,7 @@ var _ GrafanaResourceMetaAccessor = (*grafanaResourceMetaAccessor)(nil)
type grafanaResourceMetaAccessor struct {
raw interface{} // the original object (it implements metav1.Object)
obj metav1.Object
typ metav1.Type
}
// Accessor takes an arbitrary object pointer and returns meta.Interface.
@ -90,7 +95,18 @@ func MetaAccessor(raw interface{}) (GrafanaResourceMetaAccessor, error) {
if err != nil {
return nil, err
}
return &grafanaResourceMetaAccessor{raw, obj}, nil
typ, ok := raw.(metav1.Type)
if !ok {
typ, ok = obj.(metav1.Type)
}
if !ok {
return nil, fmt.Errorf("expecting the object to be a type")
}
return &grafanaResourceMetaAccessor{raw, obj, typ}, nil
}
func (m *grafanaResourceMetaAccessor) Object() metav1.Object {
return m.obj
}
func (m *grafanaResourceMetaAccessor) set(key string, val string) {
@ -240,6 +256,172 @@ func (m *grafanaResourceMetaAccessor) GetOriginTimestamp() (*time.Time, error) {
return &t, nil
}
// GetAnnotations implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) GetAnnotations() map[string]string {
return m.obj.GetAnnotations()
}
// GetCreationTimestamp implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) GetCreationTimestamp() metav1.Time {
return m.obj.GetCreationTimestamp()
}
// GetDeletionGracePeriodSeconds implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) GetDeletionGracePeriodSeconds() *int64 {
return m.obj.GetDeletionGracePeriodSeconds()
}
// GetDeletionTimestamp implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) GetDeletionTimestamp() *metav1.Time {
return m.obj.GetDeletionTimestamp()
}
// GetFinalizers implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) GetFinalizers() []string {
return m.obj.GetFinalizers()
}
// GetGenerateName implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) GetGenerateName() string {
return m.obj.GetGenerateName()
}
// GetGeneration implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) GetGeneration() int64 {
return m.obj.GetGeneration()
}
// GetLabels implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) GetLabels() map[string]string {
return m.obj.GetLabels()
}
// GetManagedFields implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) GetManagedFields() []metav1.ManagedFieldsEntry {
return m.obj.GetManagedFields()
}
// GetName implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) GetName() string {
return m.obj.GetName()
}
// GetNamespace implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) GetNamespace() string {
return m.obj.GetNamespace()
}
// GetOwnerReferences implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) GetOwnerReferences() []metav1.OwnerReference {
return m.obj.GetOwnerReferences()
}
// GetResourceVersion implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) GetResourceVersion() string {
return m.obj.GetResourceVersion()
}
// GetSelfLink implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) GetSelfLink() string {
return m.obj.GetSelfLink()
}
// GetUID implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) GetUID() types.UID {
return m.obj.GetUID()
}
// SetAnnotations implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) SetAnnotations(annotations map[string]string) {
m.obj.SetAnnotations(annotations)
}
// SetCreationTimestamp implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) SetCreationTimestamp(timestamp metav1.Time) {
m.obj.SetCreationTimestamp(timestamp)
}
// SetDeletionGracePeriodSeconds implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) SetDeletionGracePeriodSeconds(v *int64) {
m.obj.SetDeletionGracePeriodSeconds(v)
}
// SetDeletionTimestamp implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) SetDeletionTimestamp(timestamp *metav1.Time) {
m.obj.SetDeletionTimestamp(timestamp)
}
// SetFinalizers implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) SetFinalizers(finalizers []string) {
m.obj.SetFinalizers(finalizers)
}
// SetGenerateName implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) SetGenerateName(name string) {
m.obj.SetGenerateName(name)
}
// SetGeneration implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) SetGeneration(generation int64) {
m.obj.SetGeneration(generation)
}
// SetLabels implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) SetLabels(labels map[string]string) {
m.obj.SetLabels(labels)
}
// SetManagedFields implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) SetManagedFields(managedFields []metav1.ManagedFieldsEntry) {
m.obj.SetManagedFields(managedFields)
}
// SetName implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) SetName(name string) {
m.obj.SetName(name)
}
// SetNamespace implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) SetNamespace(namespace string) {
m.obj.SetNamespace(namespace)
}
// SetOwnerReferences implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) SetOwnerReferences(v []metav1.OwnerReference) {
m.obj.SetOwnerReferences(v)
}
// SetResourceVersion implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) SetResourceVersion(version string) {
m.obj.SetResourceVersion(version)
}
// SetSelfLink implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) SetSelfLink(selfLink string) {
m.obj.SetSelfLink(selfLink)
}
// SetUID implements GrafanaResourceMetaAccessor.
func (m *grafanaResourceMetaAccessor) SetUID(uid types.UID) {
m.obj.SetUID(uid)
}
func (m *grafanaResourceMetaAccessor) GetAPIVersion() string {
return m.typ.GetAPIVersion()
}
func (m *grafanaResourceMetaAccessor) SetAPIVersion(version string) {
m.typ.SetAPIVersion(version)
}
func (m *grafanaResourceMetaAccessor) GetKind() string {
return m.typ.GetKind()
}
func (m *grafanaResourceMetaAccessor) SetKind(kind string) {
m.typ.SetKind(kind)
}
func (m *grafanaResourceMetaAccessor) FindTitle(defaultTitle string) string {
// look for Spec.Title or Spec.Name
r := reflect.ValueOf(m.raw)

View File

@ -16,9 +16,19 @@ func MigrateEntityStore(engine *xorm.Engine, cfg *setting.Cfg, features featurem
mg := migrator.NewScopedMigrator(engine, cfg, "entity")
mg.AddCreateMigration()
initEntityTables(mg)
// Only in development for now!!! When we are ready, we can drop entity and use this
if cfg.Env == setting.Dev {
m2 := migrator.NewScopedMigrator(engine, cfg, "resource")
m2.AddCreateMigration()
initResourceTables(m2)
err := m2.Start(true, 0)
if err != nil {
return err
}
}
// since it's a new feature enable migration locking by default
return mg.Start(true, 0)
}

View File

@ -0,0 +1,151 @@
package migrations
import (
"fmt"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
)
func initResourceTables(mg *migrator.Migrator) string {
marker := "Initialize resource tables (v0)" // changing this key wipe+rewrite everything
mg.AddMigration(marker, &migrator.RawSQLMigration{})
tables := []migrator.Table{}
// This table helps support incrementing the resource version within a group+resource
tables = append(tables, migrator.Table{
Name: "resource_version",
Columns: []*migrator.Column{
{Name: "group", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "resource", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "rv", Type: migrator.DB_BigInt, Nullable: false}, // resource version
},
Indices: []*migrator.Index{
{Cols: []string{"group", "resource"}, Type: migrator.UniqueIndex},
},
})
tables = append(tables, migrator.Table{
Name: "resource", // write only log? all events
Columns: []*migrator.Column{
// SnowflakeID -- Each Create/Update/Delete call is an event
// Using snowflake ID doubles this field as an approximate timestamp
{Name: "event", Type: migrator.DB_BigInt, Nullable: false, IsPrimaryKey: true},
// This will be null on insert, and then updated once we are ready to commit the transaction
{Name: "rv", Type: migrator.DB_BigInt, Nullable: true},
{Name: "previous_rv", Type: migrator.DB_BigInt, Nullable: true}, // needed?
// Allows fast search for the first page in any query.
// Subsequent pages must use MAX(rv) AND is_compacted=false GROUP ...
{Name: "is_current", Type: migrator.DB_Bool, Nullable: false},
// Indicates that this is no longer the current version
// This value is updated every few minutes and makes the paged queries more efficient
{Name: "is_compacted", Type: migrator.DB_Bool, Nullable: false},
// Properties that exist in path/key (and duplicated in the json value)
{Name: "group", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "api_version", Type: migrator.DB_NVarchar, Length: 32, Nullable: false},
{Name: "namespace", Type: migrator.DB_NVarchar, Length: 63, Nullable: true}, // namespace is not required (cluster scope)
{Name: "resource", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "name", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
// The operation that wrote this resource version
// 1: created, 2: updated, 3: deleted
{Name: "operation", Type: migrator.DB_Int, Nullable: false},
// Optional Commit message (currently only used for dashboards)
{Name: "message", Type: migrator.DB_Text, Nullable: false}, // defaults to empty string
// The k8s resource JSON text (without the resourceVersion populated)
{Name: "value", Type: migrator.DB_MediumText, Nullable: false},
// Content hash -- this is appropriate to use for an etag value
{Name: "hash", Type: migrator.DB_NVarchar, Length: 32, Nullable: false},
// Path to linked blob (or null). This blob may be saved in SQL, or in an object store
{Name: "blob_path", Type: migrator.DB_NVarchar, Length: 1024, Nullable: true},
},
Indices: []*migrator.Index{
{Cols: []string{"rv"}, Type: migrator.UniqueIndex},
{Cols: []string{"is_current"}, Type: migrator.IndexType},
{Cols: []string{"is_compacted"}, Type: migrator.IndexType},
{Cols: []string{"operation"}, Type: migrator.IndexType},
{Cols: []string{"namespace"}, Type: migrator.IndexType},
{Cols: []string{"group", "resource", "name"}, Type: migrator.IndexType},
{Cols: []string{"blob_path"}, Type: migrator.IndexType},
},
})
// The values in this table are created by parsing the the value JSON and writing these as searchable columns
// These *could* be in the same table, but this structure allows us to replace the table by first
// building a parallel structure, then swapping them... maybe :)
tables = append(tables, migrator.Table{
Name: "resource_meta", // write only log? all events
Columns: []*migrator.Column{
{Name: "event", Type: migrator.DB_BigInt, Nullable: false, IsPrimaryKey: true},
// Hashed label set
{Name: "label_set", Type: migrator.DB_NVarchar, Length: 64, Nullable: true}, // null is no labels
// Helpful filters
{Name: "folder", Type: migrator.DB_NVarchar, Length: 190, Nullable: true}, // uid of folder
// For sorting values come from metadata.annotations#grafana.app/*
{Name: "created_at", Type: migrator.DB_BigInt, Nullable: false},
{Name: "updated_at", Type: migrator.DB_BigInt, Nullable: false},
// Origin metadata helps implement efficient provisioning checks
{Name: "origin", Type: migrator.DB_NVarchar, Length: 64, Nullable: true}, // The origin name
{Name: "origin_path", Type: migrator.DB_Text, Nullable: true}, // Path to resource
{Name: "origin_hash", Type: migrator.DB_NVarchar, Length: 128, Nullable: true}, // Origin hash
{Name: "origin_ts", Type: migrator.DB_BigInt, Nullable: true}, // Origin timestamp
},
Indices: []*migrator.Index{
{Cols: []string{"event"}, Type: migrator.IndexType},
{Cols: []string{"folder"}, Type: migrator.IndexType},
{Cols: []string{"created_at"}, Type: migrator.IndexType},
{Cols: []string{"updated_at"}, Type: migrator.IndexType},
{Cols: []string{"origin"}, Type: migrator.IndexType},
},
})
// This table is optional -- values can be saved in blob storage
tables = append(tables, migrator.Table{
Name: "resource_blob", // even things that failed?
Columns: []*migrator.Column{
{Name: "path", Type: migrator.DB_NVarchar, Length: 1024, Nullable: false, IsPrimaryKey: true},
{Name: "body", Type: migrator.DB_Blob, Nullable: false},
{Name: "etag", Type: migrator.DB_NVarchar, Length: 64, Nullable: false},
{Name: "size", Type: migrator.DB_BigInt, Nullable: false},
{Name: "content_type", Type: migrator.DB_NVarchar, Length: 255, Nullable: false},
},
Indices: []*migrator.Index{
{Cols: []string{"path"}, Type: migrator.UniqueIndex},
},
})
tables = append(tables, migrator.Table{
Name: "resource_label_set",
Columns: []*migrator.Column{
{Name: "label_set", Type: migrator.DB_NVarchar, Length: 64, Nullable: false},
{Name: "label", Type: migrator.DB_NVarchar, Length: 190, Nullable: false},
{Name: "value", Type: migrator.DB_Text, Nullable: false},
},
Indices: []*migrator.Index{
{Cols: []string{"label_set", "label"}, Type: migrator.UniqueIndex},
},
})
// Initialize all tables
for t := range tables {
mg.AddMigration("drop table "+tables[t].Name, migrator.NewDropTableMigration(tables[t].Name))
mg.AddMigration("create table "+tables[t].Name, migrator.NewAddTableMigration(tables[t]))
for i := range tables[t].Indices {
mg.AddMigration(fmt.Sprintf("create table %s, index: %d", tables[t].Name, i), migrator.NewAddIndexMigration(tables[t], tables[t].Indices[i]))
}
}
return marker
}

View File

@ -106,6 +106,11 @@ func (s *sqlEntityServer) init() error {
return errors.New("missing db")
}
err := s.db.Init()
if err != nil {
return err
}
sqlDB, err := s.db.GetDB()
if err != nil {
return err

View File

@ -0,0 +1,10 @@
version: v1
plugins:
- plugin: go
out: pkg/services/store/resource
opt: paths=source_relative
- plugin: go-grpc
out: pkg/services/store/resource
opt:
- paths=source_relative
- require_unimplemented_servers=false

View File

@ -0,0 +1,7 @@
version: v1
breaking:
use:
- FILE
lint:
use:
- DEFAULT

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,416 @@
syntax = "proto3";
package resource;
option go_package = "github.com/grafana/grafana/pkg/services/store/resource";
message ResourceIdentifier {
// Namespace (tenant)
string namespace = 2;
// Resource Group
string group = 1;
// The resource type
string resource = 3;
// Resource identifier (unique within namespace+group+resource)
string name = 4;
// The resource version
int64 resource_version = 5;
}
enum ResourceOperation {
UNKNOWN = 0;
CREATED = 1;
UPDATED = 2;
DELETED = 3;
BOOKMARK = 4;
}
message ResourceWrapper {
// The resource version
int64 resource_version = 1;
// Full kubernetes json bytes (although the resource version may not be accurate)
bytes value = 2;
// Operation
ResourceOperation operation = 3;
// The resource has an attached blob
bool has_blob = 4;
}
// The history and trash commands need access to commit messages
message ResourceMeta {
// The resource version
int64 resource_version = 1;
// The optional commit message
ResourceOperation operation = 2;
// Size of the full resource body
int32 size = 3;
// Hash for the resource
string hash = 4;
// The optional commit message
string message = 5;
// The kubernetes metadata section (not the full resource)
// https://github.com/kubernetes/kubernetes/blob/v1.30.1/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go#L111
bytes object_meta = 6;
// The resource has an attached blob
bool has_blob = 7;
}
// Basic blob metadata
message BlobInfo {
// Content Length
int64 size = 1;
// MD5 digest of the body
string ETag = 2;
// Content type header
string content_type = 3;
}
// Status structure is copied from:
// https://github.com/kubernetes/apimachinery/blob/v0.30.1/pkg/apis/meta/v1/generated.proto#L979
message StatusResult {
// Status of the operation.
// One of: "Success" or "Failure".
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
// +optional
string status = 1;
// A human-readable description of the status of this operation.
// +optional
string message = 2;
// A machine-readable description of why this operation is in the
// "Failure" status. If this value is empty there
// is no information available. A Reason clarifies an HTTP status
// code but does not override it.
// +optional
string reason = 3;
// Suggested HTTP return code for this status, 0 if not set.
// +optional
int32 code = 4;
}
// TODO? support PresignedUrls for upload?
message CreateBlob {
// Content type header
string content_type = 1;
// Raw value to write
bytes value = 2;
}
// ----------------------------------
// CRUD Objects
// ----------------------------------
message CreateRequest {
// Requires group+resource to be configuired
// If name is not set, a unique name will be generated
// The resourceVersion should not be set
ResourceIdentifier key = 1;
// The resource JSON.
bytes value = 2;
// Optional commit message
string message = 3;
// Optionally include a large binary object
CreateBlob blob = 4;
}
message CreateResponse {
// Status code
StatusResult status = 1;
// The updated resource version
int64 resource_version = 2;
}
message UpdateRequest {
// Full key must be set
ResourceIdentifier key = 1;
// The resource JSON.
bytes value = 2;
// Optional commit message
// +optional
string message = 3;
// Optionally link a resource object
CreateBlob blob = 4;
}
message UpdateResponse {
// Status code
StatusResult status = 1;
// The updated resource version
int64 resource_version = 2;
}
message DeleteRequest {
ResourceIdentifier key = 1;
// Preconditions: make sure the uid matches the current saved value
// +optional
string uid = 2;
}
message DeleteResponse {
// Status code
StatusResult status = 1;
// The new resource version
int64 resource_version = 2;
}
message GetResourceRequest {
ResourceIdentifier key = 1;
}
message GetResourceResponse {
// Status code
StatusResult status = 1;
// The new resource version
int64 resource_version = 2;
// The properties
bytes value = 3;
// A Signed URL that will let you fetch the blob
// If this value starts with # you must read the bytes using the GetResourceBlob request
string blob_url = 4;
}
message GetBlobRequest {
ResourceIdentifier key = 1;
}
message GetBlobResponse {
// Status code
StatusResult status = 1;
// Headers
BlobInfo info = 2;
// The raw object value
bytes value = 3;
}
// ----------------------------------
// List Request/Response
// ----------------------------------
// The label filtering requirements:
// https://github.com/kubernetes/kubernetes/blob/v1.30.1/staging/src/k8s.io/apimachinery/pkg/labels/selector.go#L141
message Requirement {
string key = 1;
string operator = 2; // See https://github.com/kubernetes/kubernetes/blob/v1.30.1/staging/src/k8s.io/apimachinery/pkg/selection/operator.go#L21
repeated string values = 3; // typically one value, but depends on the operator
}
message Sort {
enum Order {
ASC = 0;
DESC = 1;
}
string field = 1;
Order order = 2;
}
message ListOptions {
// Maximum number of items to return
// NOTE responses will also be limited by the response payload size
int64 limit = 2;
// Namespace+Group+Resource+etc
ResourceIdentifier key = 3;
// Match label
repeated Requirement labels = 4;
// Match fields (not yet supported)
repeated Requirement fields = 5;
// Limit results to items in a specific folder (not a query for everything under that folder!)
string folder = 6;
}
message ListRequest {
// Starting from the requested page (other query parameters must match!)
string next_page_token = 1;
// Filtering
ListOptions options = 2;
// Sorting instructions `field ASC/DESC`
repeated Sort sort = 3;
}
message ListResponse {
repeated ResourceWrapper items = 1;
// When more results exist, pass this in the next request
string next_page_token = 2;
// ResourceVersion of the list response
int64 resource_version = 3;
// remainingItemCount is the number of subsequent items in the list which are not included in this
// list response. If the list request contained label or field selectors, then the number of
// remaining items is unknown and the field will be left unset and omitted during serialization.
// If the list is complete (either because it is not chunking or because this is the last chunk),
// then there are no more remaining items and this field will be left unset and omitted during
// serialization.
//
// The intended use of the remainingItemCount is *estimating* the size of a collection. Clients
// should not rely on the remainingItemCount to be set or to be exact.
// +optional
int64 remaining_item_count = 4; // 0 won't be set either (no next page token)
}
message WatchRequest {
// ResourceVersion of last changes. Empty will default to full history
int64 since = 1;
// Watch specific entities
ResourceIdentifier key = 2;
// Additional options
ListOptions options = 3;
// Return initial events
bool send_initial_events = 4;
// When done with initial events, send a bookmark event
bool allow_watch_bookmarks = 5;
}
message WatchResponse {
// Timestamp the event was sent
int64 timestamp = 1;
// Entity that was created, updated, or deleted
ResourceWrapper resource = 2;
// previous version of the entity (in update+delete events)
ResourceWrapper previous = 3;
}
message HistoryRequest {
// Starting from the requested page (other query parameters must match!)
string next_page_token = 1;
// Maximum number of items to return
int64 limit = 2;
// Resource identifier
ResourceIdentifier key = 3;
// List the deleted values (eg, show trash)
bool show_deleted = 4;
}
message HistoryResponse {
repeated ResourceMeta items = 1;
// More results exist... pass this in the next request
string next_page_token = 2;
// ResourceVersion of the list response
int64 resource_version = 3;
}
message OriginRequest {
// Starting from the requested page (other query parameters must match!)
string next_page_token = 1;
// Maximum number of items to return
int64 limit = 2;
// Resource identifier
ResourceIdentifier key = 3;
// List the deleted values (eg, show trash)
string origin = 4;
}
message ResourceOriginInfo {
// The resource
ResourceIdentifier key = 1;
// Size of the full resource body
int32 resource_size = 2;
// Hash for the resource
string resource_hash = 3;
// The origin name
string origin = 4;
// Path on the origin
string path = 5;
// Verification hash from the origin
string hash = 6;
// Change time from the origin
int64 timestamp = 7;
}
message OriginResponse {
repeated ResourceOriginInfo items = 1;
// More results exist... pass this in the next request
string next_page_token = 2;
// ResourceVersion of the list response
int64 resource_version = 3;
}
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3; // Used only by the Watch method.
}
ServingStatus status = 1;
}
// The entity store provides a basic CRUD (+watch eventually) interface for generic entities
service ResourceStore {
rpc GetResource(GetResourceRequest) returns (GetResourceResponse);
rpc Create(CreateRequest) returns (CreateResponse);
rpc Update(UpdateRequest) returns (UpdateResponse);
rpc Delete(DeleteRequest) returns (DeleteResponse);
rpc List(ListRequest) returns (ListResponse);
rpc Watch(WatchRequest) returns (stream WatchResponse);
// Get the raw blob bytes and metadata
rpc GetBlob(GetBlobRequest) returns (GetBlobResponse);
// Show resource history (and trash)
rpc History(HistoryRequest) returns (HistoryResponse);
// Used for efficient provisioning
rpc Origin(OriginRequest) returns (OriginResponse);
// Check if the service is healthy
rpc IsHealthy(HealthCheckRequest) returns (HealthCheckResponse);
}

View File

@ -0,0 +1,490 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.4.0
// - protoc (unknown)
// source: resource.proto
package resource
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.62.0 or later.
const _ = grpc.SupportPackageIsVersion8
const (
ResourceStore_GetResource_FullMethodName = "/resource.ResourceStore/GetResource"
ResourceStore_Create_FullMethodName = "/resource.ResourceStore/Create"
ResourceStore_Update_FullMethodName = "/resource.ResourceStore/Update"
ResourceStore_Delete_FullMethodName = "/resource.ResourceStore/Delete"
ResourceStore_List_FullMethodName = "/resource.ResourceStore/List"
ResourceStore_Watch_FullMethodName = "/resource.ResourceStore/Watch"
ResourceStore_GetBlob_FullMethodName = "/resource.ResourceStore/GetBlob"
ResourceStore_History_FullMethodName = "/resource.ResourceStore/History"
ResourceStore_Origin_FullMethodName = "/resource.ResourceStore/Origin"
ResourceStore_IsHealthy_FullMethodName = "/resource.ResourceStore/IsHealthy"
)
// ResourceStoreClient is the client API for ResourceStore service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
//
// The entity store provides a basic CRUD (+watch eventually) interface for generic entities
type ResourceStoreClient interface {
GetResource(ctx context.Context, in *GetResourceRequest, opts ...grpc.CallOption) (*GetResourceResponse, error)
Create(ctx context.Context, in *CreateRequest, opts ...grpc.CallOption) (*CreateResponse, error)
Update(ctx context.Context, in *UpdateRequest, opts ...grpc.CallOption) (*UpdateResponse, error)
Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error)
List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error)
Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (ResourceStore_WatchClient, error)
// Get the raw blob bytes and metadata
GetBlob(ctx context.Context, in *GetBlobRequest, opts ...grpc.CallOption) (*GetBlobResponse, error)
// Show resource history (and trash)
History(ctx context.Context, in *HistoryRequest, opts ...grpc.CallOption) (*HistoryResponse, error)
// Used for efficient provisioning
Origin(ctx context.Context, in *OriginRequest, opts ...grpc.CallOption) (*OriginResponse, error)
// Check if the service is healthy
IsHealthy(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error)
}
type resourceStoreClient struct {
cc grpc.ClientConnInterface
}
func NewResourceStoreClient(cc grpc.ClientConnInterface) ResourceStoreClient {
return &resourceStoreClient{cc}
}
func (c *resourceStoreClient) GetResource(ctx context.Context, in *GetResourceRequest, opts ...grpc.CallOption) (*GetResourceResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(GetResourceResponse)
err := c.cc.Invoke(ctx, ResourceStore_GetResource_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *resourceStoreClient) Create(ctx context.Context, in *CreateRequest, opts ...grpc.CallOption) (*CreateResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(CreateResponse)
err := c.cc.Invoke(ctx, ResourceStore_Create_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *resourceStoreClient) Update(ctx context.Context, in *UpdateRequest, opts ...grpc.CallOption) (*UpdateResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(UpdateResponse)
err := c.cc.Invoke(ctx, ResourceStore_Update_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *resourceStoreClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(DeleteResponse)
err := c.cc.Invoke(ctx, ResourceStore_Delete_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *resourceStoreClient) List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ListResponse)
err := c.cc.Invoke(ctx, ResourceStore_List_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *resourceStoreClient) Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (ResourceStore_WatchClient, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &ResourceStore_ServiceDesc.Streams[0], ResourceStore_Watch_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &resourceStoreWatchClient{ClientStream: stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type ResourceStore_WatchClient interface {
Recv() (*WatchResponse, error)
grpc.ClientStream
}
type resourceStoreWatchClient struct {
grpc.ClientStream
}
func (x *resourceStoreWatchClient) Recv() (*WatchResponse, error) {
m := new(WatchResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *resourceStoreClient) GetBlob(ctx context.Context, in *GetBlobRequest, opts ...grpc.CallOption) (*GetBlobResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(GetBlobResponse)
err := c.cc.Invoke(ctx, ResourceStore_GetBlob_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *resourceStoreClient) History(ctx context.Context, in *HistoryRequest, opts ...grpc.CallOption) (*HistoryResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(HistoryResponse)
err := c.cc.Invoke(ctx, ResourceStore_History_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *resourceStoreClient) Origin(ctx context.Context, in *OriginRequest, opts ...grpc.CallOption) (*OriginResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(OriginResponse)
err := c.cc.Invoke(ctx, ResourceStore_Origin_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *resourceStoreClient) IsHealthy(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(HealthCheckResponse)
err := c.cc.Invoke(ctx, ResourceStore_IsHealthy_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// ResourceStoreServer is the server API for ResourceStore service.
// All implementations should embed UnimplementedResourceStoreServer
// for forward compatibility
//
// The entity store provides a basic CRUD (+watch eventually) interface for generic entities
type ResourceStoreServer interface {
GetResource(context.Context, *GetResourceRequest) (*GetResourceResponse, error)
Create(context.Context, *CreateRequest) (*CreateResponse, error)
Update(context.Context, *UpdateRequest) (*UpdateResponse, error)
Delete(context.Context, *DeleteRequest) (*DeleteResponse, error)
List(context.Context, *ListRequest) (*ListResponse, error)
Watch(*WatchRequest, ResourceStore_WatchServer) error
// Get the raw blob bytes and metadata
GetBlob(context.Context, *GetBlobRequest) (*GetBlobResponse, error)
// Show resource history (and trash)
History(context.Context, *HistoryRequest) (*HistoryResponse, error)
// Used for efficient provisioning
Origin(context.Context, *OriginRequest) (*OriginResponse, error)
// Check if the service is healthy
IsHealthy(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error)
}
// UnimplementedResourceStoreServer should be embedded to have forward compatible implementations.
type UnimplementedResourceStoreServer struct {
}
func (UnimplementedResourceStoreServer) GetResource(context.Context, *GetResourceRequest) (*GetResourceResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetResource not implemented")
}
func (UnimplementedResourceStoreServer) Create(context.Context, *CreateRequest) (*CreateResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Create not implemented")
}
func (UnimplementedResourceStoreServer) Update(context.Context, *UpdateRequest) (*UpdateResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Update not implemented")
}
func (UnimplementedResourceStoreServer) Delete(context.Context, *DeleteRequest) (*DeleteResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Delete not implemented")
}
func (UnimplementedResourceStoreServer) List(context.Context, *ListRequest) (*ListResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method List not implemented")
}
func (UnimplementedResourceStoreServer) Watch(*WatchRequest, ResourceStore_WatchServer) error {
return status.Errorf(codes.Unimplemented, "method Watch not implemented")
}
func (UnimplementedResourceStoreServer) GetBlob(context.Context, *GetBlobRequest) (*GetBlobResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetBlob not implemented")
}
func (UnimplementedResourceStoreServer) History(context.Context, *HistoryRequest) (*HistoryResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method History not implemented")
}
func (UnimplementedResourceStoreServer) Origin(context.Context, *OriginRequest) (*OriginResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Origin not implemented")
}
func (UnimplementedResourceStoreServer) IsHealthy(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method IsHealthy not implemented")
}
// UnsafeResourceStoreServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to ResourceStoreServer will
// result in compilation errors.
type UnsafeResourceStoreServer interface {
mustEmbedUnimplementedResourceStoreServer()
}
func RegisterResourceStoreServer(s grpc.ServiceRegistrar, srv ResourceStoreServer) {
s.RegisterService(&ResourceStore_ServiceDesc, srv)
}
func _ResourceStore_GetResource_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetResourceRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ResourceStoreServer).GetResource(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ResourceStore_GetResource_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ResourceStoreServer).GetResource(ctx, req.(*GetResourceRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ResourceStore_Create_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CreateRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ResourceStoreServer).Create(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ResourceStore_Create_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ResourceStoreServer).Create(ctx, req.(*CreateRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ResourceStore_Update_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(UpdateRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ResourceStoreServer).Update(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ResourceStore_Update_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ResourceStoreServer).Update(ctx, req.(*UpdateRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ResourceStore_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DeleteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ResourceStoreServer).Delete(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ResourceStore_Delete_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ResourceStoreServer).Delete(ctx, req.(*DeleteRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ResourceStore_List_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ResourceStoreServer).List(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ResourceStore_List_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ResourceStoreServer).List(ctx, req.(*ListRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ResourceStore_Watch_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(WatchRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(ResourceStoreServer).Watch(m, &resourceStoreWatchServer{ServerStream: stream})
}
type ResourceStore_WatchServer interface {
Send(*WatchResponse) error
grpc.ServerStream
}
type resourceStoreWatchServer struct {
grpc.ServerStream
}
func (x *resourceStoreWatchServer) Send(m *WatchResponse) error {
return x.ServerStream.SendMsg(m)
}
func _ResourceStore_GetBlob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetBlobRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ResourceStoreServer).GetBlob(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ResourceStore_GetBlob_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ResourceStoreServer).GetBlob(ctx, req.(*GetBlobRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ResourceStore_History_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HistoryRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ResourceStoreServer).History(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ResourceStore_History_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ResourceStoreServer).History(ctx, req.(*HistoryRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ResourceStore_Origin_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(OriginRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ResourceStoreServer).Origin(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ResourceStore_Origin_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ResourceStoreServer).Origin(ctx, req.(*OriginRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ResourceStore_IsHealthy_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HealthCheckRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ResourceStoreServer).IsHealthy(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ResourceStore_IsHealthy_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ResourceStoreServer).IsHealthy(ctx, req.(*HealthCheckRequest))
}
return interceptor(ctx, in, info, handler)
}
// ResourceStore_ServiceDesc is the grpc.ServiceDesc for ResourceStore service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var ResourceStore_ServiceDesc = grpc.ServiceDesc{
ServiceName: "resource.ResourceStore",
HandlerType: (*ResourceStoreServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "GetResource",
Handler: _ResourceStore_GetResource_Handler,
},
{
MethodName: "Create",
Handler: _ResourceStore_Create_Handler,
},
{
MethodName: "Update",
Handler: _ResourceStore_Update_Handler,
},
{
MethodName: "Delete",
Handler: _ResourceStore_Delete_Handler,
},
{
MethodName: "List",
Handler: _ResourceStore_List_Handler,
},
{
MethodName: "GetBlob",
Handler: _ResourceStore_GetBlob_Handler,
},
{
MethodName: "History",
Handler: _ResourceStore_History_Handler,
},
{
MethodName: "Origin",
Handler: _ResourceStore_Origin_Handler,
},
{
MethodName: "IsHealthy",
Handler: _ResourceStore_IsHealthy_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Watch",
Handler: _ResourceStore_Watch_Handler,
ServerStreams: true,
},
},
Metadata: "resource.proto",
}

View File

@ -0,0 +1,61 @@
{{/*
This is the list of all the fields in *entity.Entity, in a way that is
suitable to be imported by other templates that need to select these fields
from either the "entity" or the "entity_history" tables.
Example usage:
SELECT {{ template "common_entity_select_into" . }}
FROM {{ .Ident "entity" }} AS e
*/}}
{{ define "common_entity_select_into" }}
e.{{ .Ident "guid" | .Into .Entity.Guid }},
e.{{ .Ident "resource_version" | .Into .Entity.ResourceVersion }},
e.{{ .Ident "key" | .Into .Entity.Key }},
e.{{ .Ident "group" | .Into .Entity.Group }},
e.{{ .Ident "group_version" | .Into .Entity.GroupVersion }},
e.{{ .Ident "resource" | .Into .Entity.Resource }},
e.{{ .Ident "namespace" | .Into .Entity.Namespace }},
e.{{ .Ident "name" | .Into .Entity.Name }},
e.{{ .Ident "folder" | .Into .Entity.Folder }},
e.{{ .Ident "meta" | .Into .Entity.Meta }},
e.{{ .Ident "body" | .Into .Entity.Body }},
e.{{ .Ident "status" | .Into .Entity.Status }},
e.{{ .Ident "size" | .Into .Entity.Size }},
e.{{ .Ident "etag" | .Into .Entity.ETag }},
e.{{ .Ident "created_at" | .Into .Entity.CreatedAt }},
e.{{ .Ident "created_by" | .Into .Entity.CreatedBy }},
e.{{ .Ident "updated_at" | .Into .Entity.UpdatedAt }},
e.{{ .Ident "updated_by" | .Into .Entity.UpdatedBy }},
e.{{ .Ident "origin" | .Into .Entity.Origin.Source }},
e.{{ .Ident "origin_key" | .Into .Entity.Origin.Key }},
e.{{ .Ident "origin_ts" | .Into .Entity.Origin.Time }},
e.{{ .Ident "title" | .Into .Entity.Title }},
e.{{ .Ident "slug" | .Into .Entity.Slug }},
e.{{ .Ident "description" | .Into .Entity.Description }},
e.{{ .Ident "message" | .Into .Entity.Message }},
e.{{ .Ident "labels" | .Into .Entity.Labels }},
e.{{ .Ident "fields" | .Into .Entity.Fields }},
e.{{ .Ident "errors" | .Into .Entity.Errors }},
e.{{ .Ident "action" | .Into .Entity.Action }}
{{ end }}
{{/* Build an ORDER BY clause from a []SortBy contained in a .Sort field */}}
{{ define "common_order_by" }}
{{ $comma := listSep ", " }}
{{ range .Sort }}
{{- call $comma -}} {{ $.Ident .Field }} {{ .Direction.String }}
{{ end }}
{{ end }}

View File

@ -0,0 +1,37 @@
SELECT
{{ .Ident "rv" | .Into .Resource.Version }},
{{ .Ident "value" | .Into .Resource.Value }},
{{ .Ident "blob" | .Into .Resource.Blob }},
FROM "resource"
WHERE 1 = 1
AND {{ .Ident "namespace" }} = {{ .Arg .Key.Namespace }}
AND {{ .Ident "group" }} = {{ .Arg .Key.Group }}
AND {{ .Ident "resource" }} = {{ .Arg .Key.Resource }}
AND {{ .Ident "name" }} = {{ .Arg .Key.Name }}
{{/*
Resource versions work like snapshots at the kind level. Thus, a request
to retrieve a specific resource version should be interpreted as asking
for a resource as of how it existed at that point in time. This is why we
request matching entities with at most the provided resource version, and
return only the one with the highest resource version. In the case of not
specifying a resource version (i.e. resource version zero), it is
interpreted as the latest version of the given entity, thus we instead
query the "entity" table (which holds only the latest version of
non-deleted entities) and we don't need to specify anything else. The
"entity" table has a unique constraint on (namespace, group, resource,
name), so we're guaranteed to have at most one matching row.
*/}}
{{ if gt .ResourceVersion 0 }}
AND {{ .Ident "rv" }} <= {{ .Arg .ResourceVersion }}
ORDER BY {{ .Ident "rv" }} DESC
LIMIT 1
{{ end }}
{{ if .SelectForUpdate }}
{{ .SelectFor "UPDATE NOWAIT" }}
{{ end }}
;

View File

@ -0,0 +1,31 @@
INSERT INTO "resource"
{{/* Explicitly specify fields that will be set */}}
(
{{ .Ident "event" }},
{{ .Ident "group" }},
{{ .Ident "api_version" }},
{{ .Ident "namespace" }},
{{ .Ident "resource" }},
{{ .Ident "name" }},
{{ .Ident "operation" }},
{{ .Ident "message" }},
{{ .Ident "value" }},
{{ .Ident "hash" }},
{{ .Ident "blob" }},
)
{{/* Provide the values */}}
VALUES (
{{ .Arg .Event.ID }},
{{ .Arg .Event.Group }},
{{ .Arg .Event.ApiVersion }},
{{ .Arg .Event.Namespace }},
{{ .Arg .Event.Resource }},
{{ .Arg .Event.Name }},
{{ .Arg .Event.Operation }},
{{ .Arg .Event.Message }},
{{ .Arg .Event.Value }},
{{ .Arg .Event.Hash }},
{{ .Arg .Event.Blob }},
)
;

View File

@ -0,0 +1,8 @@
SELECT
{{ .Ident "rv" | .Into .ResourceVersion }}
FROM {{ .Ident "resource_version" }}
WHERE 1 = 1
AND {{ .Ident "group" }} = {{ .Arg .Group }}
AND {{ .Ident "resource" }} = {{ .Arg .Resource }}
;

View File

@ -0,0 +1,9 @@
UPDATE {{ .Ident "resource_version" }}
SET
{{ .Ident "rv" }} = {{ .Arg .ResourceVersion }} + 1,
WHERE 1 = 1
AND {{ .Ident "group" }} = {{ .Arg .Group }}
AND {{ .Ident "resource" }} = {{ .Arg .Resource }}
AND {{ .Ident "rv" }} = {{ .Arg .ResourceVersion }}
;

View File

@ -0,0 +1,13 @@
INSERT INTO {{ .Ident "resource_version" }}
(
{{ .Ident "group" }},
{{ .Ident "resource" }},
{{ .Ident "rv" }},
)
VALUES (
{{ .Arg .Group }},
{{ .Arg .Resource }},
1,
)
;

View File

@ -0,0 +1,7 @@
SELECT {{ .Ident "rv" | .Into .ResourceVersion }}
FROM {{ .Ident "resource_version" }}
WHERE 1 = 1
AND {{ .Ident "group" }} = {{ .Arg .Group }}
AND {{ .Ident "resource" }} = {{ .Arg .Resource }}
{{ .SelectFor "UPDATE" }}
;

View File

@ -0,0 +1,222 @@
package sqlstash
import (
"context"
"database/sql"
"embed"
"errors"
"fmt"
"strings"
"text/template"
"github.com/grafana/grafana/pkg/services/store/entity/db"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
)
// Templates setup.
var (
//go:embed data/*.sql
sqlTemplatesFS embed.FS
// all templates
helpers = template.FuncMap{
"listSep": helperListSep,
"join": helperJoin,
}
sqlTemplates = template.Must(template.New("sql").Funcs(helpers).ParseFS(sqlTemplatesFS, `data/*.sql`))
)
func mustTemplate(filename string) *template.Template {
if t := sqlTemplates.Lookup(filename); t != nil {
return t
}
panic(fmt.Sprintf("template file not found: %s", filename))
}
// Templates.
var (
sqlResourceVersionGet = mustTemplate("rv_get.sql")
sqlResourceVersionInc = mustTemplate("rv_inc.sql")
sqlResourceVersionInsert = mustTemplate("rv_insert.sql")
sqlResourceVersionLock = mustTemplate("rv_lock.sql")
sqlResourceInsert = mustTemplate("resource_insert.sql")
sqlResourceGet = mustTemplate("resource_get.sql")
)
// TxOptions.
var (
ReadCommitted = &sql.TxOptions{
Isolation: sql.LevelReadCommitted,
}
ReadCommittedRO = &sql.TxOptions{
Isolation: sql.LevelReadCommitted,
ReadOnly: true,
}
)
// SQLError is an error returned by the database, which includes additionally
// debugging information about what was sent to the database.
type SQLError struct {
Err error
CallType string // either Query, QueryRow or Exec
TemplateName string
Query string
RawQuery string
ScanDest []any
// potentially regulated information is not exported and only directly
// available for local testing and local debugging purposes, making sure it
// is never marshaled to JSON or any other serialization.
arguments []any
}
func (e SQLError) Unwrap() error {
return e.Err
}
func (e SQLError) Error() string {
return fmt.Sprintf("%s: %s with %d input arguments and %d output "+
"destination arguments: %v", e.TemplateName, e.CallType,
len(e.arguments), len(e.ScanDest), e.Err)
}
//------------------------------------------------------------------------
// Resource Version table support
//------------------------------------------------------------------------
type returnsResourceVersion struct {
ResourceVersion int64
}
func (r *returnsResourceVersion) Results() (*returnsResourceVersion, error) {
return r, nil
}
type sqlResourceVersionGetRequest struct {
*sqltemplate.SQLTemplate
Group string
Resource string
*returnsResourceVersion
}
func (r sqlResourceVersionGetRequest) Validate() error {
return nil // TODO
}
type sqlResourceVersionLockRequest struct {
*sqltemplate.SQLTemplate
Group string
Resource string
*returnsResourceVersion
}
func (r sqlResourceVersionLockRequest) Validate() error {
return nil // TODO
}
type sqlResourceVersionIncRequest struct {
*sqltemplate.SQLTemplate
Group string
Resource string
ResourceVersion int64
}
func (r sqlResourceVersionIncRequest) Validate() error {
return nil // TODO
}
type sqlResourceVersionInsertRequest struct {
*sqltemplate.SQLTemplate
Group string
Resource string
}
func (r sqlResourceVersionInsertRequest) Validate() error {
return nil // TODO
}
// resourceVersionAtomicInc atomically increases the version of a kind within a
// transaction.
func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialect, group, resource string) (newVersion int64, err error) {
// 1. Lock the kind and get the latest version
lockReq := sqlResourceVersionLockRequest{
SQLTemplate: sqltemplate.New(d),
Group: group,
Resource: resource,
returnsResourceVersion: new(returnsResourceVersion),
}
kindv, err := queryRow(ctx, x, sqlResourceVersionLock, lockReq)
// if there wasn't a row associated with the given kind, we create one with
// version 1
if errors.Is(err, sql.ErrNoRows) {
// NOTE: there is a marginal chance that we race with another writer
// trying to create the same row. This is only possible when onboarding
// a new (Group, Resource) to the cell, which should be very unlikely,
// and the workaround is simply retrying. The alternative would be to
// use INSERT ... ON CONFLICT DO UPDATE ..., but that creates a
// requirement for support in Dialect only for this marginal case, and
// we would rather keep Dialect as small as possible. Another
// alternative is to simply check if the INSERT returns a DUPLICATE KEY
// error and then retry the original SELECT, but that also adds some
// complexity to the code. That would be preferrable to changing
// Dialect, though. The current alternative, just retrying, seems to be
// enough for now.
insReq := sqlResourceVersionInsertRequest{
SQLTemplate: sqltemplate.New(d),
Group: group,
Resource: resource,
}
if _, err = exec(ctx, x, sqlResourceVersionInsert, insReq); err != nil {
return 0, fmt.Errorf("insert into kind_version: %w", err)
}
return 1, nil
}
if err != nil {
return 0, fmt.Errorf("lock kind: %w", err)
}
incReq := sqlResourceVersionIncRequest{
SQLTemplate: sqltemplate.New(d),
Group: group,
Resource: resource,
ResourceVersion: kindv.ResourceVersion,
}
if _, err = exec(ctx, x, sqlResourceVersionInc, incReq); err != nil {
return 0, fmt.Errorf("increase kind version: %w", err)
}
return kindv.ResourceVersion + 1, nil
}
// Template helpers.
// helperListSep is a helper that helps writing simpler loops in SQL templates.
// Example usage:
//
// {{ $comma := listSep ", " }}
// {{ range .Values }}
// {{/* here we put "-" on each end to remove extra white space */}}
// {{- call $comma -}}
// {{ .Value }}
// {{ end }}
func helperListSep(sep string) func() string {
var addSep bool
return func() string {
if addSep {
return sep
}
addSep = true
return ""
}
}
func helperJoin(sep string, elems ...string) string {
return strings.Join(elems, sep)
}

View File

@ -0,0 +1,323 @@
package sqlstash
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"sync"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/services/sqlstore/session"
"github.com/grafana/grafana/pkg/services/store/entity/db"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
"github.com/grafana/grafana/pkg/services/store/resource"
)
const resoruceTable = "resource"
const resourceVersionTable = "resource_version"
// Package-level errors.
var (
ErrNotFound = errors.New("entity not found")
ErrOptimisticLockingFailed = errors.New("optimistic locking failed")
ErrUserNotFoundInContext = errors.New("user not found in context")
ErrNextPageTokenNotSupported = errors.New("nextPageToken not yet supported")
ErrLimitNotSupported = errors.New("limit not yet supported")
ErrNotImplementedYet = errors.New("not implemented yet")
)
// Make sure we implement correct interfaces
var _ resource.ResourceStoreServer = &sqlResourceServer{}
func ProvideSQLEntityServer(db db.EntityDBInterface, tracer tracing.Tracer) (SqlResourceServer, error) {
ctx, cancel := context.WithCancel(context.Background())
server := &sqlResourceServer{
db: db,
log: log.New("sql-resource-server"),
ctx: ctx,
cancel: cancel,
tracer: tracer,
}
if err := prometheus.Register(sqlstash.NewStorageMetrics()); err != nil {
server.log.Warn("error registering storage server metrics", "error", err)
}
return server, nil
}
type SqlResourceServer interface {
resource.ResourceStoreServer
Init() error
Stop()
}
type sqlResourceServer struct {
log log.Logger
db db.EntityDBInterface // needed to keep xorm engine in scope
sess *session.SessionDB
dialect migrator.Dialect
broadcaster sqlstash.Broadcaster[*resource.WatchResponse]
ctx context.Context // TODO: remove
cancel context.CancelFunc
stream chan *resource.WatchResponse
tracer trace.Tracer
validator resource.RequestValidator
once sync.Once
initErr error
sqlDB db.DB
sqlDialect sqltemplate.Dialect
}
func (s *sqlResourceServer) Init() error {
s.once.Do(func() {
s.initErr = s.init()
})
if s.initErr != nil {
return fmt.Errorf("initialize Entity Server: %w", s.initErr)
}
return s.initErr
}
func (s *sqlResourceServer) init() error {
if s.sess != nil {
return nil
}
if s.db == nil {
return errors.New("missing db")
}
err := s.db.Init()
if err != nil {
return err
}
sqlDB, err := s.db.GetDB()
if err != nil {
return err
}
s.sqlDB = sqlDB
driverName := sqlDB.DriverName()
driverName = strings.TrimSuffix(driverName, "WithHooks")
switch driverName {
case db.DriverMySQL:
s.sqlDialect = sqltemplate.MySQL
case db.DriverPostgres:
s.sqlDialect = sqltemplate.PostgreSQL
case db.DriverSQLite, db.DriverSQLite3:
s.sqlDialect = sqltemplate.SQLite
default:
return fmt.Errorf("no dialect for driver %q", driverName)
}
sess, err := s.db.GetSession()
if err != nil {
return err
}
engine, err := s.db.GetEngine()
if err != nil {
return err
}
s.sess = sess
s.dialect = migrator.NewDialect(engine.DriverName())
s.validator = resource.NewSimpleValidator()
// set up the broadcaster
s.broadcaster, err = sqlstash.NewBroadcaster(s.ctx, func(stream chan *resource.WatchResponse) error {
s.stream = stream
// start the poller
go s.poller(stream)
return nil
})
if err != nil {
return err
}
return nil
}
func (s *sqlResourceServer) IsHealthy(ctx context.Context, r *resource.HealthCheckRequest) (*resource.HealthCheckResponse, error) {
ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "isHealthy"}))
if err := s.Init(); err != nil {
ctxLogger.Error("init error", "error", err)
return nil, err
}
if err := s.sqlDB.PingContext(ctx); err != nil {
return nil, err
}
// TODO: check the status of the watcher implementation as well
return &resource.HealthCheckResponse{Status: resource.HealthCheckResponse_SERVING}, nil
}
func (s *sqlResourceServer) Stop() {
s.cancel()
}
func (s *sqlResourceServer) GetResource(ctx context.Context, req *resource.GetResourceRequest) (*resource.GetResourceResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.GetResource")
defer span.End()
if err := s.Init(); err != nil {
return nil, err
}
fmt.Printf("TODO, GET: %+v", req.Key)
return nil, ErrNotImplementedYet
}
func (s *sqlResourceServer) Create(ctx context.Context, req *resource.CreateRequest) (*resource.CreateResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.Create")
defer span.End()
if err := s.Init(); err != nil {
return nil, err
}
obj, err := s.validator.ValidateCreate(ctx, req)
if err != nil {
return nil, err
}
fmt.Printf("TODO, CREATE: %v", obj.GetName())
return nil, ErrNotImplementedYet
}
func (s *sqlResourceServer) Update(ctx context.Context, req *resource.UpdateRequest) (*resource.UpdateResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.Update")
defer span.End()
if err := s.Init(); err != nil {
return nil, err
}
old, err := s.GetResource(ctx, &resource.GetResourceRequest{
Key: req.Key,
})
if err != nil {
return nil, err
}
if old == nil {
return nil, fmt.Errorf("could not get the old value")
}
obj, err := s.validator.ValidateUpdate(ctx, req, old)
if err != nil {
return nil, err
}
fmt.Printf("TODO, UPDATE: %+v", obj.GetName())
return nil, ErrNotImplementedYet
}
func (s *sqlResourceServer) Delete(ctx context.Context, req *resource.DeleteRequest) (*resource.DeleteResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.Delete")
defer span.End()
if err := s.Init(); err != nil {
return nil, err
}
fmt.Printf("TODO, DELETE: %+v", req.Key)
return nil, ErrNotImplementedYet
}
func (s *sqlResourceServer) List(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.List")
defer span.End()
if err := s.Init(); err != nil {
return nil, err
}
var rv int64
err := s.sqlDB.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
req := sqlResourceVersionGetRequest{
SQLTemplate: sqltemplate.New(s.sqlDialect),
Group: req.Options.Key.Group,
Resource: req.Options.Key.Resource,
returnsResourceVersion: new(returnsResourceVersion),
}
res, err := queryRow(ctx, tx, sqlResourceVersionGet, req)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return err
}
if res != nil {
rv = res.ResourceVersion
}
return nil
})
if err != nil {
return nil, err
}
fmt.Printf("TODO, LIST: %+v // %d", req.Options.Key, rv)
return nil, ErrNotImplementedYet
}
// Get the raw blob bytes and metadata
func (s *sqlResourceServer) GetBlob(ctx context.Context, req *resource.GetBlobRequest) (*resource.GetBlobResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.List")
defer span.End()
if err := s.Init(); err != nil {
return nil, err
}
fmt.Printf("TODO, GET BLOB: %+v", req.Key)
return nil, ErrNotImplementedYet
}
// Show resource history (and trash)
func (s *sqlResourceServer) History(ctx context.Context, req *resource.HistoryRequest) (*resource.HistoryResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.History")
defer span.End()
if err := s.Init(); err != nil {
return nil, err
}
fmt.Printf("TODO, GET History: %+v", req.Key)
return nil, ErrNotImplementedYet
}
// Used for efficient provisioning
func (s *sqlResourceServer) Origin(ctx context.Context, req *resource.OriginRequest) (*resource.OriginResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.History")
defer span.End()
if err := s.Init(); err != nil {
return nil, err
}
fmt.Printf("TODO, GET History: %+v", req.Key)
return nil, ErrNotImplementedYet
}

View File

@ -0,0 +1,152 @@
package sqlstash
import (
"context"
"crypto/md5"
"database/sql"
"encoding/hex"
"fmt"
"text/template"
"github.com/grafana/grafana/pkg/infra/appcontext"
"github.com/grafana/grafana/pkg/services/store"
"github.com/grafana/grafana/pkg/services/store/entity/db"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
)
func createETag(body []byte, meta []byte, status []byte) string {
// TODO: can we change this to something more modern like sha256?
h := md5.New()
_, _ = h.Write(meta)
_, _ = h.Write(body)
_, _ = h.Write(status)
hash := h.Sum(nil)
return hex.EncodeToString(hash[:])
}
// getCurrentUser returns a string identifying the user making a request with
// the given context.
func getCurrentUser(ctx context.Context) (string, error) {
user, err := appcontext.User(ctx)
if err != nil || user == nil {
return "", fmt.Errorf("%w: %w", ErrUserNotFoundInContext, err)
}
return store.GetUserIDString(user), nil
}
// ptrOr returns the first non-nil pointer in the list or a new non-nil pointer.
func ptrOr[P ~*E, E any](ps ...P) P {
for _, p := range ps {
if p != nil {
return p
}
}
return P(new(E))
}
// sliceOr returns the first slice that has at least one element, or a new empty
// slice.
func sliceOr[S ~[]E, E comparable](vals ...S) S {
for _, s := range vals {
if len(s) > 0 {
return s
}
}
return S{}
}
// mapOr returns the first map that has at least one element, or a new empty
// map.
func mapOr[M ~map[K]V, K comparable, V any](vals ...M) M {
for _, m := range vals {
if len(m) > 0 {
return m
}
}
return M{}
}
// queryRow uses `req` as input and output for a single-row returning query
// generated with `tmpl`, and executed in `x`.
func queryRow[T any](ctx context.Context, x db.ContextExecer, tmpl *template.Template, req sqltemplate.WithResults[T]) (T, error) {
var zero T
if err := req.Validate(); err != nil {
return zero, fmt.Errorf("query: invalid request for template %q: %w",
tmpl.Name(), err)
}
rawQuery, err := sqltemplate.Execute(tmpl, req)
if err != nil {
return zero, fmt.Errorf("execute template: %w", err)
}
query := sqltemplate.FormatSQL(rawQuery)
row := x.QueryRowContext(ctx, query, req.GetArgs()...)
if err := row.Err(); err != nil {
return zero, SQLError{
Err: err,
CallType: "QueryRow",
TemplateName: tmpl.Name(),
arguments: req.GetArgs(),
ScanDest: req.GetScanDest(),
Query: query,
RawQuery: rawQuery,
}
}
return scanRow(row, req)
}
type scanner interface {
Scan(dest ...any) error
}
// scanRow is used on *sql.Row and *sql.Rows, and is factored out here not to
// improving code reuse, but rather for ease of testing.
func scanRow[T any](sc scanner, req sqltemplate.WithResults[T]) (zero T, err error) {
if err = sc.Scan(req.GetScanDest()...); err != nil {
return zero, fmt.Errorf("row scan: %w", err)
}
res, err := req.Results()
if err != nil {
return zero, fmt.Errorf("row results: %w", err)
}
return res, nil
}
// exec uses `req` as input for a non-data returning query generated with
// `tmpl`, and executed in `x`.
func exec(ctx context.Context, x db.ContextExecer, tmpl *template.Template, req sqltemplate.SQLTemplateIface) (sql.Result, error) {
if err := req.Validate(); err != nil {
return nil, fmt.Errorf("exec: invalid request for template %q: %w",
tmpl.Name(), err)
}
rawQuery, err := sqltemplate.Execute(tmpl, req)
if err != nil {
return nil, fmt.Errorf("execute template: %w", err)
}
query := sqltemplate.FormatSQL(rawQuery)
res, err := x.ExecContext(ctx, query, req.GetArgs()...)
if err != nil {
return nil, SQLError{
Err: err,
CallType: "Exec",
TemplateName: tmpl.Name(),
arguments: req.GetArgs(),
Query: query,
RawQuery: rawQuery,
}
}
return res, nil
}

View File

@ -0,0 +1,525 @@
package sqlstash
import (
"database/sql"
"database/sql/driver"
"errors"
"fmt"
"io"
"regexp"
"strings"
"testing"
"text/template"
sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/services/store/entity/db"
"github.com/grafana/grafana/pkg/services/store/entity/db/dbimpl"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
sqltemplateMocks "github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate/mocks"
"github.com/grafana/grafana/pkg/util/testutil"
)
// newMockDBNopSQL returns a db.DB and a sqlmock.Sqlmock that doesn't validates
// SQL. This is only meant to be used to test wrapping utilities exec, query and
// queryRow, where the actual SQL is not relevant to the unit tests, but rather
// how the possible derived error conditions handled.
func newMockDBNopSQL(t *testing.T) (db.DB, sqlmock.Sqlmock) {
t.Helper()
db, mock, err := sqlmock.New(
sqlmock.MonitorPingsOption(true),
sqlmock.QueryMatcherOption(sqlmock.QueryMatcherFunc(
func(expectedSQL, actualSQL string) error {
return nil
},
)),
)
return newUnitTestDB(t, db, mock, err)
}
// newMockDBMatchWords returns a db.DB and a sqlmock.Sqlmock that will match SQL
// by splitting the expected SQL string into words, and then try to find all of
// them in the actual SQL, in the given order, case insensitively. Prepend a
// word with a `!` to say that word should not be found.
func newMockDBMatchWords(t *testing.T) (db.DB, sqlmock.Sqlmock) {
t.Helper()
db, mock, err := sqlmock.New(
sqlmock.MonitorPingsOption(true),
sqlmock.QueryMatcherOption(
sqlmock.QueryMatcherFunc(func(expectedSQL, actualSQL string) error {
actualSQL = strings.ToLower(sqltemplate.FormatSQL(actualSQL))
expectedSQL = strings.ToLower(expectedSQL)
var offset int
for _, vv := range mockDBMatchWordsRE.FindAllStringSubmatch(expectedSQL, -1) {
v := vv[1]
var shouldNotMatch bool
if v != "" && v[0] == '!' {
v = v[1:]
shouldNotMatch = true
}
if v == "" {
return fmt.Errorf("invalid expected word %q in %q", v,
expectedSQL)
}
reWord, err := regexp.Compile(`\b` + regexp.QuoteMeta(v) + `\b`)
if err != nil {
return fmt.Errorf("compile word %q from expected SQL: %s", v,
expectedSQL)
}
if shouldNotMatch {
if reWord.MatchString(actualSQL[offset:]) {
return fmt.Errorf("actual SQL fragent should not cont"+
"ain %q but it does\n\tFragment: %s\n\tFull SQL: %s",
v, actualSQL[offset:], actualSQL)
}
} else {
loc := reWord.FindStringIndex(actualSQL[offset:])
if len(loc) == 0 {
return fmt.Errorf("actual SQL fragment should contain "+
"%q but it doesn't\n\tFragment: %s\n\tFull SQL: %s",
v, actualSQL[offset:], actualSQL)
}
offset = loc[1] // advance the offset
}
}
return nil
},
),
),
)
return newUnitTestDB(t, db, mock, err)
}
var mockDBMatchWordsRE = regexp.MustCompile(`(?:\W|\A)(!?\w+)\b`)
func newUnitTestDB(t *testing.T, db *sql.DB, mock sqlmock.Sqlmock, err error) (db.DB, sqlmock.Sqlmock) {
t.Helper()
require.NoError(t, err)
return dbimpl.NewDB(db, "sqlmock"), mock
}
// mockResults aids in testing code paths with queries returning large number of
// values, like those returning *entity.Entity. This is because we want to
// emulate returning the same row columns and row values the same as a real
// database would do. This utility the same template SQL that is expected to be
// used to help populate all the expected fields.
// fileds
type mockResults[T any] struct {
t *testing.T
tmpl *template.Template
data sqltemplate.WithResults[T]
rows *sqlmock.Rows
}
// newMockResults returns a new *mockResults. If you want to emulate a call
// returning zero rows, then immediately call the Row method afterward.
func newMockResults[T any](t *testing.T, mock sqlmock.Sqlmock, tmpl *template.Template, data sqltemplate.WithResults[T]) *mockResults[T] {
t.Helper()
data.Reset()
err := tmpl.Execute(io.Discard, data)
require.NoError(t, err)
rows := mock.NewRows(data.GetColNames())
return &mockResults[T]{
t: t,
tmpl: tmpl,
data: data,
rows: rows,
}
}
// AddCurrentData uses the values contained in the `data` argument used during
// creation to populate a new expected row. It will access `data` with pointers,
// so you should replace the internal values of `data` with freshly allocated
// results to return different rows.
func (r *mockResults[T]) AddCurrentData() *mockResults[T] {
r.t.Helper()
r.data.Reset()
err := r.tmpl.Execute(io.Discard, r.data)
require.NoError(r.t, err)
d := r.data.GetScanDest()
dv := make([]driver.Value, len(d))
for i, v := range d {
dv[i] = v
}
r.rows.AddRow(dv...)
return r
}
// Rows returns the *sqlmock.Rows object built.
func (r *mockResults[T]) Rows() *sqlmock.Rows {
return r.rows
}
func TestCreateETag(t *testing.T) {
t.Parallel()
v := createETag(nil, nil, nil)
require.Equal(t, "d41d8cd98f00b204e9800998ecf8427e", v)
}
func TestGetCurrentUser(t *testing.T) {
t.Parallel()
ctx := testutil.NewDefaultTestContext(t)
username, err := getCurrentUser(ctx)
require.NotEmpty(t, username)
require.NoError(t, err)
ctx = ctx.WithUser(nil)
username, err = getCurrentUser(ctx)
require.Empty(t, username)
require.Error(t, err)
require.ErrorIs(t, err, ErrUserNotFoundInContext)
}
func TestPtrOr(t *testing.T) {
t.Parallel()
p := ptrOr[*int]()
require.NotNil(t, p)
require.Zero(t, *p)
p = ptrOr[*int](nil, nil, nil, nil, nil, nil)
require.NotNil(t, p)
require.Zero(t, *p)
v := 42
v2 := 5
p = ptrOr(nil, nil, nil, &v, nil, &v2, nil, nil)
require.NotNil(t, p)
require.Equal(t, v, *p)
p = ptrOr(nil, nil, nil, &v)
require.NotNil(t, p)
require.Equal(t, v, *p)
}
func TestSliceOr(t *testing.T) {
t.Parallel()
p := sliceOr[[]int]()
require.NotNil(t, p)
require.Len(t, p, 0)
p = sliceOr[[]int](nil, nil, nil, nil)
require.NotNil(t, p)
require.Len(t, p, 0)
p = sliceOr([]int{}, []int{}, []int{}, []int{})
require.NotNil(t, p)
require.Len(t, p, 0)
v := []int{1, 2}
p = sliceOr([]int{}, nil, []int{}, v, nil, []int{}, []int{10}, nil)
require.NotNil(t, p)
require.Equal(t, v, p)
p = sliceOr([]int{}, nil, []int{}, v)
require.NotNil(t, p)
require.Equal(t, v, p)
}
func TestMapOr(t *testing.T) {
t.Parallel()
p := mapOr[map[string]int]()
require.NotNil(t, p)
require.Len(t, p, 0)
p = mapOr(nil, map[string]int(nil), nil, map[string]int{}, nil)
require.NotNil(t, p)
require.Len(t, p, 0)
v := map[string]int{"a": 0, "b": 1}
v2 := map[string]int{"c": 2, "d": 3}
p = mapOr(nil, map[string]int(nil), v, v2, nil, map[string]int{}, nil)
require.NotNil(t, p)
require.Equal(t, v, p)
p = mapOr(nil, map[string]int(nil), v)
require.NotNil(t, p)
require.Equal(t, v, p)
}
var (
validTestTmpl = template.Must(template.New("test").Parse("nothing special"))
invalidTestTmpl = template.New("no definition should fail to exec")
errTest = errors.New("because of reasons")
)
// expectRows is a testing helper to keep mocks in sync when adding rows to a
// mocked SQL result. This is a helper to test `query` and `queryRow`.
type expectRows[T any] struct {
*sqlmock.Rows
ExpectedResults []T
req *sqltemplateMocks.WithResults[T]
}
func newReturnsRow[T any](dbmock sqlmock.Sqlmock, req *sqltemplateMocks.WithResults[T]) *expectRows[T] {
return &expectRows[T]{
Rows: dbmock.NewRows(nil),
req: req,
}
}
// Add adds a new value that should be returned by the `query` or `queryRow`
// operation.
func (r *expectRows[T]) Add(value T, err error) *expectRows[T] {
r.req.EXPECT().GetScanDest().Return(nil).Once()
r.req.EXPECT().Results().Return(value, err).Once()
r.Rows.AddRow()
r.ExpectedResults = append(r.ExpectedResults, value)
return r
}
func TestQueryRow(t *testing.T) {
t.Parallel()
t.Run("happy path", func(t *testing.T) {
t.Parallel()
// test declarations
ctx := testutil.NewDefaultTestContext(t)
req := sqltemplateMocks.NewWithResults[int64](t)
db, dbmock := newMockDBNopSQL(t)
rows := newReturnsRow(dbmock, req)
// setup expectations
req.EXPECT().Validate().Return(nil).Once()
req.EXPECT().GetArgs().Return(nil).Once()
rows.Add(1, nil)
dbmock.ExpectQuery("").WillReturnRows(rows.Rows)
// execute and assert
res, err := queryRow(ctx, db, validTestTmpl, req)
require.NoError(t, err)
require.Equal(t, rows.ExpectedResults[0], res)
})
t.Run("invalid request", func(t *testing.T) {
t.Parallel()
// test declarations
ctx := testutil.NewDefaultTestContext(t)
req := sqltemplateMocks.NewWithResults[int64](t)
db, _ := newMockDBNopSQL(t)
// setup expectations
req.EXPECT().Validate().Return(errTest).Once()
// execute and assert
res, err := queryRow(ctx, db, invalidTestTmpl, req)
require.Zero(t, res)
require.Error(t, err)
require.ErrorContains(t, err, "invalid request")
})
t.Run("error executing template", func(t *testing.T) {
t.Parallel()
// test declarations
ctx := testutil.NewDefaultTestContext(t)
req := sqltemplateMocks.NewWithResults[int64](t)
db, _ := newMockDBNopSQL(t)
// setup expectations
req.EXPECT().Validate().Return(nil).Once()
// execute and assert
res, err := queryRow(ctx, db, invalidTestTmpl, req)
require.Zero(t, res)
require.Error(t, err)
require.ErrorContains(t, err, "execute template")
})
t.Run("error executing query", func(t *testing.T) {
t.Parallel()
// test declarations
ctx := testutil.NewDefaultTestContext(t)
req := sqltemplateMocks.NewWithResults[int64](t)
db, dbmock := newMockDBNopSQL(t)
// setup expectations
req.EXPECT().Validate().Return(nil).Once()
req.EXPECT().GetArgs().Return(nil)
req.EXPECT().GetScanDest().Return(nil).Maybe()
dbmock.ExpectQuery("").WillReturnError(errTest)
// execute and assert
res, err := queryRow(ctx, db, validTestTmpl, req)
require.Zero(t, res)
require.Error(t, err)
require.ErrorAs(t, err, new(SQLError))
})
}
// scannerFunc is an adapter for the `scanner` interface.
type scannerFunc func(dest ...any) error
func (f scannerFunc) Scan(dest ...any) error {
return f(dest...)
}
func TestScanRow(t *testing.T) {
t.Parallel()
const value int64 = 1
t.Run("happy path", func(t *testing.T) {
t.Parallel()
// test declarations
req := sqltemplateMocks.NewWithResults[int64](t)
sc := scannerFunc(func(dest ...any) error {
return nil
})
// setup expectations
req.EXPECT().GetScanDest().Return(nil).Once()
req.EXPECT().Results().Return(value, nil).Once()
// execute and assert
res, err := scanRow(sc, req)
require.NoError(t, err)
require.Equal(t, value, res)
})
t.Run("scan error", func(t *testing.T) {
t.Parallel()
// test declarations
req := sqltemplateMocks.NewWithResults[int64](t)
sc := scannerFunc(func(dest ...any) error {
return errTest
})
// setup expectations
req.EXPECT().GetScanDest().Return(nil).Once()
// execute and assert
res, err := scanRow(sc, req)
require.Zero(t, res)
require.Error(t, err)
require.ErrorIs(t, err, errTest)
})
t.Run("results error", func(t *testing.T) {
t.Parallel()
// test declarations
req := sqltemplateMocks.NewWithResults[int64](t)
sc := scannerFunc(func(dest ...any) error {
return nil
})
// setup expectations
req.EXPECT().GetScanDest().Return(nil).Once()
req.EXPECT().Results().Return(0, errTest).Once()
// execute and assert
res, err := scanRow(sc, req)
require.Zero(t, res)
require.Error(t, err)
require.ErrorIs(t, err, errTest)
})
}
func TestExec(t *testing.T) {
t.Parallel()
t.Run("happy path", func(t *testing.T) {
t.Parallel()
// test declarations
ctx := testutil.NewDefaultTestContext(t)
req := sqltemplateMocks.NewSQLTemplateIface(t)
db, dbmock := newMockDBNopSQL(t)
// setup expectations
req.EXPECT().Validate().Return(nil).Once()
req.EXPECT().GetArgs().Return(nil).Once()
dbmock.ExpectExec("").WillReturnResult(sqlmock.NewResult(0, 0))
// execute and assert
res, err := exec(ctx, db, validTestTmpl, req)
require.NoError(t, err)
require.NotNil(t, res)
})
t.Run("invalid request", func(t *testing.T) {
t.Parallel()
// test declarations
ctx := testutil.NewDefaultTestContext(t)
req := sqltemplateMocks.NewSQLTemplateIface(t)
db, _ := newMockDBNopSQL(t)
// setup expectations
req.EXPECT().Validate().Return(errTest).Once()
// execute and assert
res, err := exec(ctx, db, invalidTestTmpl, req)
require.Nil(t, res)
require.Error(t, err)
require.ErrorContains(t, err, "invalid request")
})
t.Run("error executing template", func(t *testing.T) {
t.Parallel()
// test declarations
ctx := testutil.NewDefaultTestContext(t)
req := sqltemplateMocks.NewSQLTemplateIface(t)
db, _ := newMockDBNopSQL(t)
// setup expectations
req.EXPECT().Validate().Return(nil).Once()
// execute and assert
res, err := exec(ctx, db, invalidTestTmpl, req)
require.Nil(t, res)
require.Error(t, err)
require.ErrorContains(t, err, "execute template")
})
t.Run("error executing SQL", func(t *testing.T) {
t.Parallel()
// test declarations
ctx := testutil.NewDefaultTestContext(t)
req := sqltemplateMocks.NewSQLTemplateIface(t)
db, dbmock := newMockDBNopSQL(t)
// setup expectations
req.EXPECT().Validate().Return(nil).Once()
req.EXPECT().GetArgs().Return(nil)
dbmock.ExpectExec("").WillReturnError(errTest)
// execute and assert
res, err := exec(ctx, db, validTestTmpl, req)
require.Nil(t, res)
require.Error(t, err)
require.ErrorAs(t, err, new(SQLError))
})
}

View File

@ -0,0 +1,60 @@
package sqlstash
import (
"time"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/store/resource"
)
func (s *sqlResourceServer) Watch(*resource.WatchRequest, resource.ResourceStore_WatchServer) error {
return ErrNotImplementedYet
}
func (s *sqlResourceServer) poller(stream chan *resource.WatchResponse) {
var err error
since := int64(0)
interval := 1 * time.Second
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-t.C:
since, err = s.poll(since, stream)
if err != nil {
s.log.Error("watch error", "err", err)
}
t.Reset(interval)
}
}
}
func (s *sqlResourceServer) poll(since int64, out chan *resource.WatchResponse) (int64, error) {
ctx, span := s.tracer.Start(s.ctx, "storage_server.poll")
defer span.End()
ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "poll"}))
for hasmore := true; hasmore; {
err := func() error {
if false {
// TODO
out <- &resource.WatchResponse{}
}
// TODO, copy from entity store
hasmore = false
return nil
}()
if err != nil {
ctxLogger.Error("poll error", "error", err)
return since, err
}
}
return since, nil
}

View File

@ -0,0 +1,167 @@
package resource
import (
"context"
"encoding/json"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/grafana/grafana/pkg/infra/appcontext"
"github.com/grafana/grafana/pkg/services/apiserver/utils"
"github.com/grafana/grafana/pkg/services/auth/identity"
)
// Verify that all required fields are set, and the user has permission to set the common metadata fields
type RequestValidator interface {
ValidateCreate(ctx context.Context, req *CreateRequest) (utils.GrafanaResourceMetaAccessor, error)
ValidateUpdate(ctx context.Context, req *UpdateRequest, current *GetResourceResponse) (utils.GrafanaResourceMetaAccessor, error)
}
type simpleValidator struct {
folderAccess func(ctx context.Context, user identity.Requester, uid string) bool
originAccess func(ctx context.Context, user identity.Requester, origin string) bool
}
func NewSimpleValidator() RequestValidator {
return &simpleValidator{
// folderAccess: func(ctx context.Context, user identity.Requester, uid string) bool {
// return true // for now you can right anything to any folder
// },
}
}
type dummyObject struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
}
var _ RequestValidator = &simpleValidator{}
func readValue(ctx context.Context, value []byte) (identity.Requester, utils.GrafanaResourceMetaAccessor, error) {
// TODO -- we just need Identity not a full user!
user, err := appcontext.User(ctx)
if err != nil {
return nil, nil, err
}
dummy := &dummyObject{}
err = json.Unmarshal(value, dummy)
if err != nil {
return nil, nil, err
}
obj, err := utils.MetaAccessor(dummy)
return user, obj, err
}
// This is the validation that happens for both CREATE and UPDATE
func (v *simpleValidator) validate(ctx context.Context, user identity.Requester, obj utils.GrafanaResourceMetaAccessor) (utils.GrafanaResourceMetaAccessor, error) {
// To avoid confusion, lets not include the resource version in the saved value
// This is a little weird, but it means there won't be confusion that the saved value
// is likely the previous resource version!
if obj.GetResourceVersion() != "" {
return obj, fmt.Errorf("do not save the resource version in the value")
}
// Make sure all common fields are populated
if obj.GetName() == "" {
return obj, fmt.Errorf("missing name")
}
if obj.GetAPIVersion() == "" {
return obj, fmt.Errorf("missing apiversion")
}
if obj.GetUID() == "" {
return obj, fmt.Errorf("the uid is not configured")
}
// Check folder access
folder := obj.GetFolder()
if folder != "" {
if v.folderAccess == nil {
return obj, fmt.Errorf("folder access not supported")
} else if !v.folderAccess(ctx, user, folder) {
return obj, fmt.Errorf("not allowed to write resource to folder")
}
}
// Make sure you can write values to this origin
origin, err := obj.GetOriginInfo()
if err != nil {
return nil, err
}
if origin != nil && v.originAccess != nil && !v.originAccess(ctx, user, origin.Name) {
return obj, fmt.Errorf("not allowed to write values to this origin")
}
return obj, nil
}
func (v *simpleValidator) ValidateCreate(ctx context.Context, req *CreateRequest) (utils.GrafanaResourceMetaAccessor, error) {
user, obj, err := readValue(ctx, req.Value)
if err != nil {
return nil, err
}
if obj.GetKind() != req.Key.Resource {
return obj, fmt.Errorf("expected resource kind")
}
if req.Key.ResourceVersion > 0 {
return obj, fmt.Errorf("create key must not include a resource version")
}
// Make sure the created by user is accurate
//----------------------------------------
val := obj.GetCreatedBy()
if val != "" && val != user.GetUID().String() {
return obj, fmt.Errorf("created by annotation does not match: metadata.annotations#" + utils.AnnoKeyCreatedBy)
}
// Create can not have updated properties
//----------------------------------------
if obj.GetUpdatedBy() != "" {
return obj, fmt.Errorf("unexpected metadata.annotations#" + utils.AnnoKeyCreatedBy)
}
ts, err := obj.GetUpdatedTimestamp()
if err != nil {
return obj, nil
}
if ts != nil {
return obj, fmt.Errorf("unexpected metadata.annotations#" + utils.AnnoKeyUpdatedTimestamp)
}
return v.validate(ctx, user, obj)
}
func (v *simpleValidator) ValidateUpdate(ctx context.Context, req *UpdateRequest, current *GetResourceResponse) (utils.GrafanaResourceMetaAccessor, error) {
user, obj, err := readValue(ctx, req.Value)
if err != nil {
return nil, err
}
if obj.GetKind() != req.Key.Resource {
return obj, fmt.Errorf("expected resource kind")
}
if req.Key.ResourceVersion > 0 && req.Key.ResourceVersion != current.ResourceVersion {
return obj, fmt.Errorf("resource version does not match (optimistic locking)")
}
_, oldobj, err := readValue(ctx, current.Value)
if err != nil {
return nil, err
}
if obj.GetCreatedBy() != oldobj.GetCreatedBy() {
return obj, fmt.Errorf(utils.AnnoKeyCreatedBy + " value has changed")
}
if obj.GetCreationTimestamp() != oldobj.GetCreationTimestamp() {
return obj, fmt.Errorf("creation time changed")
}
// Make sure the update user is accurate
//----------------------------------------
val := obj.GetUpdatedBy()
if val != "" && val != user.GetUID().String() {
return obj, fmt.Errorf("created by annotation does not match: metadata.annotations#" + utils.AnnoKeyUpdatedBy)
}
return v.validate(ctx, user, obj)
}