Restores in app platform (#97582)

This commit is contained in:
Stephanie Hingtgen 2024-12-13 15:55:43 -07:00 committed by GitHub
parent 4871cd8825
commit 8f6e9f8ed0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
42 changed files with 2192 additions and 608 deletions

View File

@ -172,6 +172,7 @@ Experimental features might be changed or removed without prior notice.
| `disableClassicHTTPHistogram` | Disables classic HTTP Histogram (use with enableNativeHTTPHistogram) |
| `kubernetesSnapshots` | Routes snapshot requests from /api to the /apis endpoint |
| `kubernetesDashboards` | Use the kubernetes API in the frontend for dashboards |
| `kubernetesCliDashboards` | Use the k8s client to retrieve dashboards internally |
| `kubernetesFolders` | Use the kubernetes API in the frontend for folders, and route /api/folders requests to k8s |
| `grafanaAPIServerTestingWithExperimentalAPIs` | Facilitate integration testing of experimental APIs |
| `datasourceQueryTypes` | Show query type endpoints in datasource API servers (currently hardcoded for testdata, expressions, and prometheus) |

View File

@ -112,6 +112,7 @@ export interface FeatureToggles {
kubernetesPlaylists?: boolean;
kubernetesSnapshots?: boolean;
kubernetesDashboards?: boolean;
kubernetesCliDashboards?: boolean;
kubernetesFolders?: boolean;
grafanaAPIServerTestingWithExperimentalAPIs?: boolean;
datasourceQueryTypes?: boolean;

View File

@ -838,14 +838,14 @@ func getDashboardShouldReturn200WithConfig(t *testing.T, sc *scenarioContext, pr
if dashboardService == nil {
dashboardService, err = service.ProvideDashboardServiceImpl(
cfg, dashboardStore, folderStore, features, folderPermissions, dashboardPermissions,
ac, folderSvc, fStore, nil, zanzana.NewNoopClient(),
ac, folderSvc, fStore, nil, zanzana.NewNoopClient(), nil,
)
require.NoError(t, err)
}
dashboardProvisioningService, err := service.ProvideDashboardServiceImpl(
cfg, dashboardStore, folderStore, features, folderPermissions, dashboardPermissions,
ac, folderSvc, fStore, nil, zanzana.NewNoopClient(),
ac, folderSvc, fStore, nil, zanzana.NewNoopClient(), nil,
)
require.NoError(t, err)

View File

@ -478,7 +478,7 @@ func setupServer(b testing.TB, sc benchScenario, features featuremgmt.FeatureTog
dashboardSvc, err := dashboardservice.ProvideDashboardServiceImpl(
sc.cfg, dashStore, folderStore,
features, folderPermissions, dashboardPermissions, ac,
folderServiceWithFlagOn, fStore, nil, zanzana.NewNoopClient(),
folderServiceWithFlagOn, fStore, nil, zanzana.NewNoopClient(), nil,
)
require.NoError(b, err)

View File

@ -47,9 +47,11 @@ func GetAuthorizer(dashboardService dashboards.DashboardService, l log.Logger) a
}
// expensive path to lookup permissions for a single dashboard
// must include deleted to allow for restores
dto, err := dashboardService.GetDashboard(ctx, &dashboards.GetDashboardQuery{
UID: attr.GetName(),
OrgID: info.OrgID,
IncludeDeleted: true,
})
if err != nil {
return authorizer.DecisionDeny, "error loading dashboard", err

View File

@ -0,0 +1,108 @@
package dashboard
import (
"context"
"fmt"
"net/http"
"strconv"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storage"
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
// LatestConnector will return the latest version of the resource - even if it is deleted
type LatestConnector interface {
rest.Storage
rest.Connecter
rest.StorageMetadata
}
func NewLatestConnector(unified resource.ResourceClient, gr schema.GroupResource, opts generic.RESTOptions, scheme *runtime.Scheme) LatestConnector {
return &latestREST{
unified: unified,
gr: gr,
opts: opts,
scheme: scheme,
}
}
type latestREST struct {
unified resource.ResourceClient
gr schema.GroupResource
opts generic.RESTOptions
scheme *runtime.Scheme
}
func (l *latestREST) New() runtime.Object {
return &metav1.PartialObjectMetadataList{}
}
func (l *latestREST) Destroy() {
}
func (l *latestREST) ConnectMethods() []string {
return []string{"GET"}
}
func (l *latestREST) ProducesMIMETypes(verb string) []string {
return nil
}
func (l *latestREST) ProducesObject(verb string) interface{} {
return &metav1.PartialObjectMetadataList{}
}
func (l *latestREST) NewConnectOptions() (runtime.Object, bool, string) {
return nil, false, ""
}
func (l *latestREST) Connect(ctx context.Context, uid string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
info, err := request.NamespaceInfoFrom(ctx, true)
if err != nil {
return nil, err
}
key := &resource.ResourceKey{
Namespace: info.Value,
Group: l.gr.Group,
Resource: l.gr.Resource,
Name: uid,
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
rsp, err := l.unified.Read(ctx, &resource.ReadRequest{
Key: key,
ResourceVersion: 0, // 0 will return the latest version that was not a delete event
IncludeDeleted: true,
})
if err != nil {
responder.Error(err)
return
} else if rsp == nil || (rsp.Error != nil && rsp.Error.Code == http.StatusNotFound) {
responder.Error(storage.NewKeyNotFoundError(uid, 0))
return
} else if rsp.Error != nil {
responder.Error(fmt.Errorf("could not retrieve object: %s", rsp.Error.Message))
return
}
uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, rsp.Value)
if err != nil {
responder.Error(fmt.Errorf("could not convert object: %s", err.Error()))
return
}
finalObj := uncastObj.(*unstructured.Unstructured)
finalObj.SetResourceVersion(strconv.FormatInt(rsp.ResourceVersion, 10))
responder.Object(http.StatusOK, finalObj)
}), nil
}

View File

@ -0,0 +1,101 @@
package dashboard
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"reflect"
"strconv"
"testing"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
)
func TestLatest(t *testing.T) {
gr := schema.GroupResource{
Group: "group",
Resource: "resource",
}
ctx := context.Background()
mockResponder := &mockResponder{}
mockClient := &mockResourceClient{}
r := &latestREST{
unified: mockClient,
gr: gr,
opts: generic.RESTOptions{},
}
t.Run("no namespace in context", func(t *testing.T) {
_, err := r.Connect(ctx, "test-uid", nil, mockResponder)
require.Error(t, err)
})
ctx = request.WithNamespace(context.Background(), "default")
t.Run("happy path", func(t *testing.T) {
req := httptest.NewRequest("GET", "/latest", nil)
w := httptest.NewRecorder()
readReq := &resource.ReadRequest{
Key: &resource.ResourceKey{
Namespace: "default",
Group: "group",
Resource: "resource",
Name: "uid",
},
ResourceVersion: 0,
IncludeDeleted: true,
}
expectedObject := &metav1.PartialObjectMetadata{
TypeMeta: metav1.TypeMeta{
Kind: "resource",
APIVersion: "v0alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "uid",
Namespace: "default",
ResourceVersion: strconv.FormatInt(123, 10),
},
}
val, err := json.Marshal(expectedObject)
require.NoError(t, err)
mockClient.On("Read", ctx, readReq).Return(&resource.ReadResponse{
ResourceVersion: 123,
Value: val,
}, nil).Once()
mockResponder.On("Object", http.StatusOK, mock.MatchedBy(func(obj interface{}) bool {
unstructuredObj, ok := obj.(*unstructured.Unstructured)
expectedMap := map[string]interface{}{
"apiVersion": expectedObject.APIVersion,
"kind": expectedObject.Kind,
"metadata": map[string]interface{}{
"name": expectedObject.Name,
"namespace": expectedObject.Namespace,
"resourceVersion": expectedObject.ResourceVersion,
"creationTimestamp": nil,
},
}
return ok && reflect.DeepEqual(unstructuredObj.Object, expectedMap)
}))
handler, err := r.Connect(ctx, "uid", nil, mockResponder)
require.NoError(t, err)
handler.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
mockClient.AssertExpectations(t)
mockResponder.AssertExpectations(t)
})
}

View File

@ -0,0 +1,125 @@
package dashboard
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storage"
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
type RestoreConnector interface {
rest.Storage
rest.Connecter
rest.StorageMetadata
}
func NewRestoreConnector(unified resource.ResourceClient, gr schema.GroupResource, opts generic.RESTOptions) RestoreConnector {
return &restoreREST{
unified: unified,
gr: gr,
opts: opts,
}
}
type restoreREST struct {
unified resource.ResourceClient
gr schema.GroupResource
opts generic.RESTOptions
}
func (r *restoreREST) New() runtime.Object {
return &metav1.PartialObjectMetadataList{}
}
func (r *restoreREST) Destroy() {
}
func (r *restoreREST) ConnectMethods() []string {
return []string{"POST"}
}
func (r *restoreREST) ProducesMIMETypes(verb string) []string {
return nil
}
func (r *restoreREST) ProducesObject(verb string) interface{} {
return &metav1.PartialObjectMetadataList{}
}
func (r *restoreREST) NewConnectOptions() (runtime.Object, bool, string) {
return nil, false, ""
}
type RestoreOptions struct {
ResourceVersion int64 `json:"resourceVersion"`
}
func (r *restoreREST) Connect(ctx context.Context, uid string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
info, err := request.NamespaceInfoFrom(ctx, true)
if err != nil {
return nil, err
}
key := &resource.ResourceKey{
Namespace: info.Value,
Group: r.gr.Group,
Resource: r.gr.Resource,
Name: uid,
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
body, err := io.ReadAll(req.Body)
if err != nil {
responder.Error(fmt.Errorf("unable to read request body: %s", err.Error()))
return
}
reqBody := &RestoreOptions{}
err = json.Unmarshal(body, &reqBody)
if err != nil {
responder.Error(fmt.Errorf("unable to unmarshal request body: %s", err.Error()))
return
}
if reqBody.ResourceVersion == 0 {
responder.Error(fmt.Errorf("resource version required"))
return
}
rsp, err := r.unified.Restore(ctx, &resource.RestoreRequest{
ResourceVersion: reqBody.ResourceVersion,
Key: key,
})
if err != nil {
responder.Error(err)
return
} else if rsp == nil || (rsp.Error != nil && rsp.Error.Code == http.StatusNotFound) {
responder.Error(storage.NewKeyNotFoundError(uid, reqBody.ResourceVersion))
return
} else if rsp.Error != nil {
responder.Error(fmt.Errorf("could not re-create object: %s", rsp.Error.Message))
return
}
obj := metav1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{
Name: key.Name,
Namespace: key.Namespace,
ResourceVersion: strconv.FormatInt(rsp.ResourceVersion, 10),
},
}
responder.Object(http.StatusOK, &obj)
}), nil
}

View File

@ -0,0 +1,128 @@
package dashboard
import (
"bytes"
"context"
"fmt"
"net/http"
"net/http/httptest"
"strconv"
"testing"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
)
type mockResourceClient struct {
mock.Mock
resource.ResourceClient
}
func (m *mockResourceClient) Restore(ctx context.Context, req *resource.RestoreRequest, opts ...grpc.CallOption) (*resource.RestoreResponse, error) {
args := m.Called(ctx, req)
return args.Get(0).(*resource.RestoreResponse), args.Error(1)
}
func (m *mockResourceClient) Read(ctx context.Context, req *resource.ReadRequest, opts ...grpc.CallOption) (*resource.ReadResponse, error) {
args := m.Called(ctx, req)
return args.Get(0).(*resource.ReadResponse), args.Error(1)
}
type mockResponder struct {
mock.Mock
}
func (m *mockResponder) Object(statusCode int, obj runtime.Object) {
m.Called(statusCode, obj)
}
func (m *mockResponder) Error(err error) {
m.Called(err)
}
func TestRestore(t *testing.T) {
gr := schema.GroupResource{
Group: "group",
Resource: "resource",
}
ctx := context.Background()
mockResponder := &mockResponder{}
mockClient := &mockResourceClient{}
r := &restoreREST{
unified: mockClient,
gr: gr,
opts: generic.RESTOptions{},
}
t.Run("no namespace in context", func(t *testing.T) {
_, err := r.Connect(ctx, "test-uid", nil, mockResponder)
assert.Error(t, err)
})
ctx = request.WithNamespace(context.Background(), "default")
t.Run("invalid resourceVersion", func(t *testing.T) {
req := httptest.NewRequest("POST", "/restore", bytes.NewReader([]byte(`{"resourceVersion":0}`)))
w := httptest.NewRecorder()
expectedError := fmt.Errorf("resource version required")
mockResponder.On("Error", mock.MatchedBy(func(err error) bool {
return err.Error() == expectedError.Error()
}))
handler, err := r.Connect(ctx, "test-uid", nil, mockResponder)
assert.NoError(t, err)
handler.ServeHTTP(w, req)
mockResponder.AssertExpectations(t)
})
t.Run("happy path", func(t *testing.T) {
req := httptest.NewRequest("POST", "/restore", bytes.NewReader([]byte(`{"resourceVersion":123}`)))
w := httptest.NewRecorder()
restoreReq := &resource.RestoreRequest{
ResourceVersion: 123,
Key: &resource.ResourceKey{
Namespace: "default",
Group: "group",
Resource: "resource",
Name: "uid",
},
}
expectedObject := &metav1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{
Name: "uid",
Namespace: "default",
ResourceVersion: strconv.FormatInt(123, 10),
},
}
mockClient.On("Restore", ctx, restoreReq).Return(&resource.RestoreResponse{
ResourceVersion: 123,
}, nil).Once()
mockResponder.On("Object", http.StatusOK, mock.MatchedBy(func(obj interface{}) bool {
metadata, ok := obj.(*metav1.PartialObjectMetadata)
return ok &&
metadata.ObjectMeta.Name == "uid" &&
metadata.ObjectMeta.Namespace == "default" &&
metadata.ObjectMeta.ResourceVersion == "123"
})).Return(expectedObject)
handler, err := r.Connect(ctx, "uid", nil, mockResponder)
assert.NoError(t, err)
handler.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
mockClient.AssertExpectations(t)
mockResponder.AssertExpectations(t)
})
}

View File

@ -156,6 +156,19 @@ func (b *DashboardsAPIBuilder) UpdateAPIGroupInfo(apiGroupInfo *genericapiserver
}
}
storage[dash.StoragePath("restore")] = dashboard.NewRestoreConnector(
b.unified,
dashboardv0alpha1.DashboardResourceInfo.GroupResource(),
defaultOpts,
)
storage[dash.StoragePath("latest")] = dashboard.NewLatestConnector(
b.unified,
dashboardv0alpha1.DashboardResourceInfo.GroupResource(),
defaultOpts,
scheme,
)
// Register the DTO endpoint that will consolidate all dashboard bits
storage[dash.StoragePath("dto")], err = dashboard.NewDTOConnector(
storage[dash.StoragePath()],

View File

@ -147,6 +147,19 @@ func (b *DashboardsAPIBuilder) UpdateAPIGroupInfo(apiGroupInfo *genericapiserver
}
}
storage[dash.StoragePath("restore")] = dashboard.NewRestoreConnector(
b.unified,
dashboardv1alpha1.DashboardResourceInfo.GroupResource(),
defaultOpts,
)
storage[dash.StoragePath("latest")] = dashboard.NewLatestConnector(
b.unified,
dashboardv1alpha1.DashboardResourceInfo.GroupResource(),
defaultOpts,
scheme,
)
// Register the DTO endpoint that will consolidate all dashboard bits
storage[dash.StoragePath("dto")], err = dashboard.NewDTOConnector(
storage[dash.StoragePath()],

View File

@ -147,6 +147,19 @@ func (b *DashboardsAPIBuilder) UpdateAPIGroupInfo(apiGroupInfo *genericapiserver
}
}
storage[dash.StoragePath("restore")] = dashboard.NewRestoreConnector(
b.unified,
dashboardv2alpha1.DashboardResourceInfo.GroupResource(),
defaultOpts,
)
storage[dash.StoragePath("latest")] = dashboard.NewLatestConnector(
b.unified,
dashboardv2alpha1.DashboardResourceInfo.GroupResource(),
defaultOpts,
scheme,
)
// Register the DTO endpoint that will consolidate all dashboard bits
storage[dash.StoragePath("dto")], err = dashboard.NewDTOConnector(
storage[dash.StoragePath()],

View File

@ -260,6 +260,8 @@ type GetDashboardQuery struct {
FolderID *int64
FolderUID *string
OrgID int64
IncludeDeleted bool // only supported when using unified storage
}
type DashboardTagCloudItem struct {

View File

@ -10,15 +10,24 @@ import (
"github.com/grafana/authlib/claims"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"golang.org/x/exp/slices"
"github.com/grafana/grafana-plugin-sdk-go/backend/gtime"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/apis/dashboard/v0alpha1"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/metrics"
"github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/apiserver"
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
"github.com/grafana/grafana/pkg/services/authz/zanzana"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/dashboards/dashboardaccess"
@ -31,6 +40,8 @@ import (
"github.com/grafana/grafana/pkg/services/store/entity"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/util"
k8sUser "k8s.io/apiserver/pkg/authentication/user"
k8sRequest "k8s.io/apiserver/pkg/endpoints/request"
)
var (
@ -61,16 +72,42 @@ type DashboardServiceImpl struct {
dashboardPermissions accesscontrol.DashboardPermissionsService
ac accesscontrol.AccessControl
zclient zanzana.Client
k8sclient dashboardK8sHandler
metrics *dashboardsMetrics
}
// interface to allow for testing
type dashboardK8sHandler interface {
getClient(ctx context.Context, orgID int64) (dynamic.ResourceInterface, bool)
}
var _ dashboardK8sHandler = (*dashk8sHandler)(nil)
type dashk8sHandler struct {
namespacer request.NamespaceMapper
gvr schema.GroupVersionResource
restConfigProvider apiserver.RestConfigProvider
}
// This is the uber service that implements a three smaller services
func ProvideDashboardServiceImpl(
cfg *setting.Cfg, dashboardStore dashboards.Store, folderStore folder.FolderStore,
features featuremgmt.FeatureToggles, folderPermissionsService accesscontrol.FolderPermissionsService,
dashboardPermissionsService accesscontrol.DashboardPermissionsService, ac accesscontrol.AccessControl,
folderSvc folder.Service, fStore folder.Store, r prometheus.Registerer, zclient zanzana.Client,
restConfigProvider apiserver.RestConfigProvider,
) (*DashboardServiceImpl, error) {
gvr := schema.GroupVersionResource{
Group: v0alpha1.GROUP,
Version: v0alpha1.VERSION,
Resource: v0alpha1.DashboardResourceInfo.GetName(),
}
k8sHandler := &dashk8sHandler{
gvr: gvr,
namespacer: request.GetNamespaceMapper(cfg),
restConfigProvider: restConfigProvider,
}
dashSvc := &DashboardServiceImpl{
cfg: cfg,
log: log.New("dashboard-service"),
@ -82,6 +119,7 @@ func ProvideDashboardServiceImpl(
zclient: zclient,
folderStore: folderStore,
folderService: folderSvc,
k8sclient: k8sHandler,
metrics: newDashboardsMetrics(r),
}
@ -588,14 +626,92 @@ func (dr *DashboardServiceImpl) setDefaultFolderPermissions(ctx context.Context,
}
}
func (dk8s *dashk8sHandler) getClient(ctx context.Context, orgID int64) (dynamic.ResourceInterface, bool) {
dyn, err := dynamic.NewForConfig(dk8s.restConfigProvider.GetRestConfig(ctx))
if err != nil {
return nil, false
}
return dyn.Resource(dk8s.gvr).Namespace(dk8s.namespacer(orgID)), true
}
func (dr *DashboardServiceImpl) getK8sContext(ctx context.Context) (context.Context, context.CancelFunc, error) {
requester, requesterErr := identity.GetRequester(ctx)
if requesterErr != nil {
return nil, nil, requesterErr
}
user, exists := k8sRequest.UserFrom(ctx)
if !exists {
// add in k8s user if not there yet
var ok bool
user, ok = requester.(k8sUser.Info)
if !ok {
return nil, nil, fmt.Errorf("could not convert user to k8s user")
}
}
newCtx := k8sRequest.WithUser(context.Background(), user)
newCtx = log.WithContextualAttributes(newCtx, log.FromContext(ctx))
// TODO: after GLSA token workflow is removed, make this return early
// and move the else below to be unconditional
if requesterErr == nil {
newCtxWithRequester := identity.WithRequester(newCtx, requester)
newCtx = newCtxWithRequester
}
// inherit the deadline from the original context, if it exists
deadline, ok := ctx.Deadline()
if ok {
var newCancel context.CancelFunc
newCtx, newCancel = context.WithTimeout(newCtx, time.Until(deadline))
return newCtx, newCancel, nil
}
return newCtx, nil, nil
}
func (dr *DashboardServiceImpl) GetDashboard(ctx context.Context, query *dashboards.GetDashboardQuery) (*dashboards.Dashboard, error) {
// TODO: once getting dashboards by ID in unified storage is supported, we can remove the restraint of the uid being provided
if dr.features.IsEnabledGlobally(featuremgmt.FlagKubernetesCliDashboards) && query.UID != "" {
// create a new context - prevents issues when the request stems from the k8s api itself
// otherwise the context goes through the handlers twice and causes issues
newCtx, cancel, err := dr.getK8sContext(ctx)
if err != nil {
return nil, err
} else if cancel != nil {
defer cancel()
}
client, ok := dr.k8sclient.getClient(newCtx, query.OrgID)
if !ok {
return nil, nil
}
// if including deleted dashboards, use the /latest subresource
subresource := ""
if query.IncludeDeleted {
subresource = "latest"
}
out, err := client.Get(newCtx, query.UID, v1.GetOptions{}, subresource)
if err != nil {
return nil, err
} else if out == nil {
return nil, dashboards.ErrDashboardNotFound
}
return UnstructuredToLegacyDashboard(out, query.OrgID)
}
return dr.dashboardStore.GetDashboard(ctx, query)
}
// TODO: once getting dashboards by ID in unified storage is supported, need to do the same as the above function
func (dr *DashboardServiceImpl) GetDashboardUIDByID(ctx context.Context, query *dashboards.GetDashboardRefByIDQuery) (*dashboards.DashboardRef, error) {
return dr.dashboardStore.GetDashboardUIDByID(ctx, query)
}
// TODO: add support to get dashboards in unified storage.
func (dr *DashboardServiceImpl) GetDashboards(ctx context.Context, query *dashboards.GetDashboardsQuery) ([]*dashboards.Dashboard, error) {
return dr.dashboardStore.GetDashboards(ctx, query)
}
@ -628,7 +744,7 @@ func (dr *DashboardServiceImpl) getDashboardsSharedWithUser(ctx context.Context,
DashboardUIDs: dashboardUids,
OrgID: user.GetOrgID(),
}
sharedDashboards, err := dr.dashboardStore.GetDashboards(ctx, dashboardsQuery)
sharedDashboards, err := dr.GetDashboards(ctx, dashboardsQuery)
if err != nil {
return nil, err
}
@ -692,6 +808,7 @@ func (dr *DashboardServiceImpl) getUserSharedDashboardUIDs(ctx context.Context,
return userDashboardUIDs, nil
}
// TODO: add support to find dashboards in unified storage too.
func (dr *DashboardServiceImpl) FindDashboards(ctx context.Context, query *dashboards.FindPersistedDashboardsQuery) ([]dashboards.DashboardSearchProjection, error) {
ctx, span := tracer.Start(ctx, "dashboards.service.FindDashboards")
defer span.End()
@ -838,3 +955,67 @@ func (dr *DashboardServiceImpl) CleanUpDeletedDashboards(ctx context.Context) (i
return deletedDashboardsCount, nil
}
func UnstructuredToLegacyDashboard(item *unstructured.Unstructured, orgID int64) (*dashboards.Dashboard, error) {
spec, ok := item.Object["spec"].(map[string]any)
if !ok {
return nil, errors.New("error parsing dashboard from k8s response")
}
out := dashboards.Dashboard{
OrgID: orgID,
Data: simplejson.NewFromAny(spec),
}
obj, err := utils.MetaAccessor(item)
if err != nil {
return nil, err
}
out.UID = obj.GetName()
out.Slug = obj.GetSlug()
out.FolderUID = obj.GetFolder()
out.Created = obj.GetCreationTimestamp().Time
updated, err := obj.GetUpdatedTimestamp()
if err == nil && updated != nil {
out.Updated = *updated
}
deleted := obj.GetDeletionTimestamp()
if deleted != nil {
out.Deleted = obj.GetDeletionTimestamp().Time
}
if id, ok := spec["id"].(int64); ok {
out.ID = id
}
if gnetID, ok := spec["gnet_id"].(int64); ok {
out.GnetID = gnetID
}
if version, ok := spec["version"].(int64); ok {
out.Version = int(version)
}
if pluginID, ok := spec["plugin_id"].(string); ok {
out.PluginID = pluginID
}
if isFolder, ok := spec["is_folder"].(bool); ok {
out.IsFolder = isFolder
}
if hasACL, ok := spec["has_acl"].(bool); ok {
out.HasACL = hasACL
}
if title, ok := spec["title"].(string); ok {
out.Title = title
// if slug isn't in the metadata, add it via the title
if out.Slug == "" {
out.UpdateSlug()
}
}
return &out, nil
}

View File

@ -885,6 +885,7 @@ func permissionScenario(t *testing.T, desc string, canSave bool, fn permissionSc
folder.NewFakeStore(),
nil,
zanzana.NewNoopClient(),
nil,
)
require.NoError(t, err)
guardian.InitAccessControlGuardian(cfg, ac, dashboardService)
@ -952,6 +953,7 @@ func callSaveWithResult(t *testing.T, cmd dashboards.SaveDashboardCommand, sqlSt
folder.NewFakeStore(),
nil,
zanzana.NewNoopClient(),
nil,
)
require.NoError(t, err)
res, err := service.SaveDashboard(context.Background(), &dto, false)
@ -978,6 +980,7 @@ func callSaveWithError(t *testing.T, cmd dashboards.SaveDashboardCommand, sqlSto
folder.NewFakeStore(),
nil,
zanzana.NewNoopClient(),
nil,
)
require.NoError(t, err)
_, err = service.SaveDashboard(context.Background(), &dto, false)
@ -1023,6 +1026,7 @@ func saveTestDashboard(t *testing.T, title string, orgID int64, folderUID string
folder.NewFakeStore(),
nil,
zanzana.NewNoopClient(),
nil,
)
require.NoError(t, err)
res, err := service.SaveDashboard(context.Background(), &dto, false)
@ -1075,6 +1079,7 @@ func saveTestFolder(t *testing.T, title string, orgID int64, sqlStore db.DB) *da
folder.NewFakeStore(),
nil,
zanzana.NewNoopClient(),
nil,
)
require.NoError(t, err)
res, err := service.SaveDashboard(context.Background(), &dto, false)

View File

@ -2,10 +2,13 @@ package service
import (
"context"
"reflect"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"k8s.io/client-go/dynamic"
"github.com/grafana/grafana/pkg/apimachinery/identity"
"github.com/grafana/grafana/pkg/components/simplejson"
@ -17,6 +20,8 @@ import (
"github.com/grafana/grafana/pkg/services/guardian"
"github.com/grafana/grafana/pkg/services/user"
"github.com/grafana/grafana/pkg/setting"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)
func TestDashboardService(t *testing.T) {
@ -223,3 +228,135 @@ func TestDashboardService(t *testing.T) {
})
})
}
type mockDashK8sCli struct {
mock.Mock
}
func (m *mockDashK8sCli) getClient(ctx context.Context, orgID int64) (dynamic.ResourceInterface, bool) {
args := m.Called(ctx, orgID)
return args.Get(0).(dynamic.ResourceInterface), args.Bool(1)
}
type mockResourceInterface struct {
mock.Mock
dynamic.ResourceInterface
}
func (m *mockResourceInterface) Get(ctx context.Context, name string, options metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error) {
args := m.Called(ctx, name, options, subresources)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*unstructured.Unstructured), args.Error(1)
}
func TestGetDashboard(t *testing.T) {
fakeStore := dashboards.FakeDashboardStore{}
defer fakeStore.AssertExpectations(t)
service := &DashboardServiceImpl{
cfg: setting.NewCfg(),
dashboardStore: &fakeStore,
}
t.Run("Should fallback to dashboard store if Kubernetes feature flags are not enabled", func(t *testing.T) {
service.features = featuremgmt.WithFeatures()
query := &dashboards.GetDashboardQuery{
UID: "test-uid",
OrgID: 1,
}
fakeStore.On("GetDashboard", mock.Anything, query).Return(&dashboards.Dashboard{}, nil).Once()
dashboard, err := service.GetDashboard(context.Background(), query)
require.NoError(t, err)
require.NotNil(t, dashboard)
fakeStore.AssertExpectations(t)
})
t.Run("Should use Kubernetes client if feature flags are enabled", func(t *testing.T) {
k8sClientMock := new(mockDashK8sCli)
k8sResourceMock := new(mockResourceInterface)
service.k8sclient = k8sClientMock
service.features = featuremgmt.WithFeatures(featuremgmt.FlagKubernetesCliDashboards)
query := &dashboards.GetDashboardQuery{
UID: "test-uid",
OrgID: 1,
}
dashboardUnstructured := unstructured.Unstructured{Object: map[string]any{
"metadata": map[string]any{
"name": "uid",
},
"spec": map[string]any{
"test": "test",
"title": "testing slugify",
},
}}
dashboardExpected := dashboards.Dashboard{
UID: "uid", // uid is the name of the k8s object
Title: "testing slugify",
Slug: "testing-slugify", // slug is taken from title
OrgID: 1, // orgID is populated from the query
Data: simplejson.NewFromAny(map[string]any{"test": "test", "title": "testing slugify"}),
}
ctx := context.Background()
userCtx := &user.SignedInUser{UserID: 1}
ctx = identity.WithRequester(ctx, userCtx)
k8sClientMock.On("getClient", mock.Anything, int64(1)).Return(k8sResourceMock, true).Once()
k8sResourceMock.On("Get", mock.Anything, query.UID, mock.Anything, mock.Anything).Return(&dashboardUnstructured, nil).Once()
dashboard, err := service.GetDashboard(ctx, query)
require.NoError(t, err)
require.NotNil(t, dashboard)
k8sClientMock.AssertExpectations(t)
// make sure the conversion is working
require.True(t, reflect.DeepEqual(dashboard, &dashboardExpected))
})
t.Run("Should return error when Kubernetes client fails", func(t *testing.T) {
k8sClientMock := new(mockDashK8sCli)
k8sResourceMock := new(mockResourceInterface)
service.k8sclient = k8sClientMock
service.features = featuremgmt.WithFeatures(featuremgmt.FlagKubernetesCliDashboards)
query := &dashboards.GetDashboardQuery{
UID: "test-uid",
OrgID: 1,
}
ctx := context.Background()
userCtx := &user.SignedInUser{UserID: 1}
ctx = identity.WithRequester(ctx, userCtx)
k8sClientMock.On("getClient", mock.Anything, int64(1)).Return(k8sResourceMock, true).Once()
k8sResourceMock.On("Get", mock.Anything, query.UID, mock.Anything, mock.Anything).Return(nil, assert.AnError).Once()
dashboard, err := service.GetDashboard(ctx, query)
require.Error(t, err)
require.Nil(t, dashboard)
k8sClientMock.AssertExpectations(t)
})
t.Run("Should return dashboard not found if Kubernetes client returns nil", func(t *testing.T) {
k8sClientMock := new(mockDashK8sCli)
k8sResourceMock := new(mockResourceInterface)
service.k8sclient = k8sClientMock
service.features = featuremgmt.WithFeatures(featuremgmt.FlagKubernetesCliDashboards)
query := &dashboards.GetDashboardQuery{
UID: "test-uid",
OrgID: 1,
}
ctx := context.Background()
userCtx := &user.SignedInUser{UserID: 1}
ctx = identity.WithRequester(ctx, userCtx)
k8sClientMock.On("getClient", mock.Anything, int64(1)).Return(k8sResourceMock, true).Once()
k8sResourceMock.On("Get", mock.Anything, query.UID, mock.Anything, mock.Anything).Return(nil, nil).Once()
dashboard, err := service.GetDashboard(ctx, query)
require.Error(t, err)
require.Equal(t, dashboards.ErrDashboardNotFound, err)
require.Nil(t, dashboard)
k8sClientMock.AssertExpectations(t)
})
}

View File

@ -122,7 +122,7 @@ func (dr *DashboardServiceImpl) findDashboardsZanzanaGeneric(ctx context.Context
}
for len(result) < int(limit) {
findRes, err := dr.dashboardStore.FindDashboards(ctx, &query)
findRes, err := dr.FindDashboards(ctx, &query)
if err != nil {
return nil, err
}

View File

@ -73,6 +73,7 @@ func TestIntegrationDashboardServiceZanzana(t *testing.T) {
fStore,
nil,
zclient,
nil,
)
require.NoError(t, err)

View File

@ -98,9 +98,10 @@ func TestValidateDashboardExists(t *testing.T) {
cfg := setting.NewCfg()
dsStore := dashsnapdb.ProvideStore(sqlStore, cfg)
secretsService := secretsManager.SetupTestService(t, database.ProvideSecretsStore(sqlStore))
dashboardStore, err := dashdb.ProvideDashboardStore(sqlStore, cfg, featuremgmt.WithFeatures(), tagimpl.ProvideService(sqlStore), quotatest.New(false, nil))
feats := featuremgmt.WithFeatures()
dashboardStore, err := dashdb.ProvideDashboardStore(sqlStore, cfg, feats, tagimpl.ProvideService(sqlStore), quotatest.New(false, nil))
require.NoError(t, err)
dashSvc, err := dashsvc.ProvideDashboardServiceImpl(cfg, dashboardStore, folderimpl.ProvideDashboardFolderStore(sqlStore), nil, nil, nil, acmock.New(), foldertest.NewFakeService(), folder.NewFakeStore(), nil, zanzana.NewNoopClient())
dashSvc, err := dashsvc.ProvideDashboardServiceImpl(cfg, dashboardStore, folderimpl.ProvideDashboardFolderStore(sqlStore), feats, nil, nil, acmock.New(), foldertest.NewFakeService(), folder.NewFakeStore(), nil, zanzana.NewNoopClient(), nil)
require.NoError(t, err)
s := ProvideService(dsStore, secretsService, dashSvc)
ctx := context.Background()

View File

@ -713,6 +713,12 @@ var (
Owner: grafanaAppPlatformSquad,
FrontendOnly: true,
},
{
Name: "kubernetesCliDashboards",
Description: "Use the k8s client to retrieve dashboards internally",
Stage: FeatureStageExperimental,
Owner: grafanaAppPlatformSquad,
},
{
Name: "kubernetesFolders",
Description: "Use the kubernetes API in the frontend for folders, and route /api/folders requests to k8s",

View File

@ -93,6 +93,7 @@ transformationsVariableSupport,GA,@grafana/dataviz-squad,false,false,true
kubernetesPlaylists,GA,@grafana/grafana-app-platform-squad,false,true,false
kubernetesSnapshots,experimental,@grafana/grafana-app-platform-squad,false,true,false
kubernetesDashboards,experimental,@grafana/grafana-app-platform-squad,false,false,true
kubernetesCliDashboards,experimental,@grafana/grafana-app-platform-squad,false,false,false
kubernetesFolders,experimental,@grafana/search-and-storage,false,false,false
grafanaAPIServerTestingWithExperimentalAPIs,experimental,@grafana/search-and-storage,false,false,false
datasourceQueryTypes,experimental,@grafana/grafana-app-platform-squad,false,true,false

1 Name Stage Owner requiresDevMode RequiresRestart FrontendOnly
93 kubernetesPlaylists GA @grafana/grafana-app-platform-squad false true false
94 kubernetesSnapshots experimental @grafana/grafana-app-platform-squad false true false
95 kubernetesDashboards experimental @grafana/grafana-app-platform-squad false false true
96 kubernetesCliDashboards experimental @grafana/grafana-app-platform-squad false false false
97 kubernetesFolders experimental @grafana/search-and-storage false false false
98 grafanaAPIServerTestingWithExperimentalAPIs experimental @grafana/search-and-storage false false false
99 datasourceQueryTypes experimental @grafana/grafana-app-platform-squad false true false

View File

@ -383,6 +383,10 @@ const (
// Use the kubernetes API in the frontend for dashboards
FlagKubernetesDashboards = "kubernetesDashboards"
// FlagKubernetesCliDashboards
// Use the k8s client to retrieve dashboards internally
FlagKubernetesCliDashboards = "kubernetesCliDashboards"
// FlagKubernetesFolders
// Use the kubernetes API in the frontend for folders, and route /api/folders requests to k8s
FlagKubernetesFolders = "kubernetesFolders"

View File

@ -1874,6 +1874,18 @@
"requiresRestart": true
}
},
{
"metadata": {
"name": "kubernetesCliDashboards",
"resourceVersion": "1733520389522",
"creationTimestamp": "2024-12-06T21:26:29Z"
},
"spec": {
"description": "Use the k8s client to retrieve dashboards internally",
"stage": "experimental",
"codeowner": "@grafana/grafana-app-platform-squad"
}
},
{
"metadata": {
"name": "kubernetesDashboards",

View File

@ -490,7 +490,7 @@ func TestIntegrationNestedFolderService(t *testing.T) {
CanEditValue: true,
})
dashSrv, err := dashboardservice.ProvideDashboardServiceImpl(cfg, dashStore, folderStore, featuresFlagOn, folderPermissions, dashboardPermissions, ac, serviceWithFlagOn, nestedFolderStore, nil, zanzana.NewNoopClient())
dashSrv, err := dashboardservice.ProvideDashboardServiceImpl(cfg, dashStore, folderStore, featuresFlagOn, folderPermissions, dashboardPermissions, ac, serviceWithFlagOn, nestedFolderStore, nil, zanzana.NewNoopClient(), nil)
require.NoError(t, err)
alertStore, err := ngstore.ProvideDBStore(cfg, featuresFlagOn, db, serviceWithFlagOn, dashSrv, ac, b)
@ -573,7 +573,7 @@ func TestIntegrationNestedFolderService(t *testing.T) {
})
dashSrv, err := dashboardservice.ProvideDashboardServiceImpl(cfg, dashStore, folderStore, featuresFlagOff,
folderPermissions, dashboardPermissions, ac, serviceWithFlagOff, nestedFolderStore, nil, zanzana.NewNoopClient())
folderPermissions, dashboardPermissions, ac, serviceWithFlagOff, nestedFolderStore, nil, zanzana.NewNoopClient(), nil)
require.NoError(t, err)
alertStore, err := ngstore.ProvideDBStore(cfg, featuresFlagOff, db, serviceWithFlagOff, dashSrv, ac, b)
@ -719,7 +719,7 @@ func TestIntegrationNestedFolderService(t *testing.T) {
tc.service.dashboardStore = dashStore
tc.service.store = nestedFolderStore
dashSrv, err := dashboardservice.ProvideDashboardServiceImpl(cfg, dashStore, folderStore, tc.featuresFlag, folderPermissions, dashboardPermissions, ac, tc.service, tc.service.store, nil, zanzana.NewNoopClient())
dashSrv, err := dashboardservice.ProvideDashboardServiceImpl(cfg, dashStore, folderStore, tc.featuresFlag, folderPermissions, dashboardPermissions, ac, tc.service, tc.service.store, nil, zanzana.NewNoopClient(), nil)
require.NoError(t, err)
alertStore, err := ngstore.ProvideDBStore(cfg, tc.featuresFlag, db, tc.service, dashSrv, ac, b)
require.NoError(t, err)
@ -1505,6 +1505,7 @@ func TestIntegrationNestedFolderSharedWithMe(t *testing.T) {
nestedFolderStore,
nil,
zanzana.NewNoopClient(),
nil,
)
require.NoError(t, err)

View File

@ -311,6 +311,7 @@ func createDashboard(t *testing.T, sqlStore db.DB, user user.SignedInUser, dash
folder.NewFakeStore(),
nil,
zanzana.NewNoopClient(),
nil,
)
require.NoError(t, err)
dashboard, err := service.SaveDashboard(context.Background(), dashItem, true)
@ -398,6 +399,7 @@ func scenarioWithPanel(t *testing.T, desc string, fn func(t *testing.T, sc scena
features, folderPermissions, dashboardPermissions, ac,
foldertest.NewFakeService(), folder.NewFakeStore(),
nil, zanzana.NewNoopClient(),
nil,
)
require.NoError(t, svcErr)
guardian.InitAccessControlGuardian(cfg, ac, dashboardService)
@ -459,7 +461,7 @@ func testScenario(t *testing.T, desc string, fn func(t *testing.T, sc scenarioCo
cfg, dashboardStore, folderStore,
features, folderPermissions, dashboardPermissions, ac,
foldertest.NewFakeService(), folder.NewFakeStore(),
nil, zanzana.NewNoopClient(),
nil, zanzana.NewNoopClient(), nil,
)
require.NoError(t, dashSvcErr)
guardian.InitAccessControlGuardian(cfg, ac, dashService)

View File

@ -736,7 +736,7 @@ func createDashboard(t *testing.T, sqlStore db.DB, user *user.SignedInUser, dash
cfg, dashboardStore, folderStore,
featuremgmt.WithFeatures(), acmock.NewMockedPermissionsService(), dashPermissionService, ac,
foldertest.NewFakeService(), folder.NewFakeStore(),
nil, zanzana.NewNoopClient(),
nil, zanzana.NewNoopClient(), nil,
)
require.NoError(t, err)
dashboard, err := service.SaveDashboard(context.Background(), dashItem, true)
@ -833,7 +833,7 @@ func testScenario(t *testing.T, desc string, fn func(t *testing.T, sc scenarioCo
cfg, dashStore, folderStore,
features, acmock.NewMockedPermissionsService(), dashPermissionService, ac,
foldertest.NewFakeService(), folder.NewFakeStore(),
nil, zanzana.NewNoopClient(),
nil, zanzana.NewNoopClient(), nil,
)
require.NoError(t, err)
guardian.InitAccessControlGuardian(cfg, ac, dashService)

View File

@ -62,7 +62,7 @@ func SetupDashboardService(tb testing.TB, sqlStore db.DB, fs *folderimpl.Dashboa
cfg, dashboardStore, fs,
features, folderPermissions, dashboardPermissions, ac,
foldertest.NewFakeService(), folder.NewFakeStore(),
nil, zanzana.NewNoopClient(),
nil, zanzana.NewNoopClient(), nil,
)
require.NoError(tb, err)

View File

@ -326,7 +326,7 @@ func TestIntegrationUnauthenticatedUserCanGetPubdashPanelQueryData(t *testing.T)
dashService, err := service.ProvideDashboardServiceImpl(
cfg, dashboardStoreService, folderStore,
featuremgmt.WithFeatures(), acmock.NewMockedPermissionsService(), dashPermissionService, ac,
foldertest.NewFakeService(), folder.NewFakeStore(), nil, zanzana.NewNoopClient(),
foldertest.NewFakeService(), folder.NewFakeStore(), nil, zanzana.NewNoopClient(), nil,
)
require.NoError(t, err)

File diff suppressed because it is too large Load Diff

View File

@ -192,7 +192,9 @@ message ReadRequest {
ResourceKey key = 1;
// Optionally pick an explicit resource version
int64 resource_version = 3;
int64 resource_version = 2;
// Optionally decide to return the latest RV if deleted
bool include_deleted = 3;
}
message ReadResponse {
@ -643,6 +645,25 @@ message ResourceTableRow {
bytes object = 4;
}
//----------------------------
// Restore Support
//----------------------------
message RestoreRequest {
// Full key must be set
ResourceKey key = 1;
// The resource version to restore
int64 resource_version = 2;
}
message RestoreResponse {
// Error details
ErrorResult error = 1;
// The updated resource version
int64 resource_version = 2;
}
//----------------------------
// Blob Support
@ -730,6 +751,7 @@ service ResourceStore {
rpc Create(CreateRequest) returns (CreateResponse);
rpc Update(UpdateRequest) returns (UpdateResponse);
rpc Delete(DeleteRequest) returns (DeleteResponse);
rpc Restore(RestoreRequest) returns (RestoreResponse);
// The results *may* include values that should not be returned to the user
// This will perform best-effort filtering to increase performace.

View File

@ -1,6 +1,6 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.4.0
// - protoc-gen-go-grpc v1.5.1
// - protoc (unknown)
// source: resource.proto
@ -15,14 +15,15 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.62.0 or later.
const _ = grpc.SupportPackageIsVersion8
// Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion9
const (
ResourceStore_Read_FullMethodName = "/resource.ResourceStore/Read"
ResourceStore_Create_FullMethodName = "/resource.ResourceStore/Create"
ResourceStore_Update_FullMethodName = "/resource.ResourceStore/Update"
ResourceStore_Delete_FullMethodName = "/resource.ResourceStore/Delete"
ResourceStore_Restore_FullMethodName = "/resource.ResourceStore/Restore"
ResourceStore_List_FullMethodName = "/resource.ResourceStore/List"
ResourceStore_Watch_FullMethodName = "/resource.ResourceStore/Watch"
)
@ -40,6 +41,7 @@ type ResourceStoreClient interface {
Create(ctx context.Context, in *CreateRequest, opts ...grpc.CallOption) (*CreateResponse, error)
Update(ctx context.Context, in *UpdateRequest, opts ...grpc.CallOption) (*UpdateResponse, error)
Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error)
Restore(ctx context.Context, in *RestoreRequest, opts ...grpc.CallOption) (*RestoreResponse, error)
// The results *may* include values that should not be returned to the user
// This will perform best-effort filtering to increase performace.
// NOTE: storage.Interface is ultimatly responsible for the final filtering
@ -47,7 +49,7 @@ type ResourceStoreClient interface {
// The results *may* include values that should not be returned to the user
// This will perform best-effort filtering to increase performace.
// NOTE: storage.Interface is ultimatly responsible for the final filtering
Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (ResourceStore_WatchClient, error)
Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[WatchEvent], error)
}
type resourceStoreClient struct {
@ -98,6 +100,16 @@ func (c *resourceStoreClient) Delete(ctx context.Context, in *DeleteRequest, opt
return out, nil
}
func (c *resourceStoreClient) Restore(ctx context.Context, in *RestoreRequest, opts ...grpc.CallOption) (*RestoreResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(RestoreResponse)
err := c.cc.Invoke(ctx, ResourceStore_Restore_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *resourceStoreClient) List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ListResponse)
@ -108,13 +120,13 @@ func (c *resourceStoreClient) List(ctx context.Context, in *ListRequest, opts ..
return out, nil
}
func (c *resourceStoreClient) Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (ResourceStore_WatchClient, error) {
func (c *resourceStoreClient) Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[WatchEvent], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &ResourceStore_ServiceDesc.Streams[0], ResourceStore_Watch_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &resourceStoreWatchClient{ClientStream: stream}
x := &grpc.GenericClientStream[WatchRequest, WatchEvent]{ClientStream: stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
@ -124,26 +136,12 @@ func (c *resourceStoreClient) Watch(ctx context.Context, in *WatchRequest, opts
return x, nil
}
type ResourceStore_WatchClient interface {
Recv() (*WatchEvent, error)
grpc.ClientStream
}
type resourceStoreWatchClient struct {
grpc.ClientStream
}
func (x *resourceStoreWatchClient) Recv() (*WatchEvent, error) {
m := new(WatchEvent)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type ResourceStore_WatchClient = grpc.ServerStreamingClient[WatchEvent]
// ResourceStoreServer is the server API for ResourceStore service.
// All implementations should embed UnimplementedResourceStoreServer
// for forward compatibility
// for forward compatibility.
//
// This provides the CRUD+List+Watch support needed for a k8s apiserver
// The semantics and behaviors of this service are constrained by kubernetes
@ -154,6 +152,7 @@ type ResourceStoreServer interface {
Create(context.Context, *CreateRequest) (*CreateResponse, error)
Update(context.Context, *UpdateRequest) (*UpdateResponse, error)
Delete(context.Context, *DeleteRequest) (*DeleteResponse, error)
Restore(context.Context, *RestoreRequest) (*RestoreResponse, error)
// The results *may* include values that should not be returned to the user
// This will perform best-effort filtering to increase performace.
// NOTE: storage.Interface is ultimatly responsible for the final filtering
@ -161,12 +160,15 @@ type ResourceStoreServer interface {
// The results *may* include values that should not be returned to the user
// This will perform best-effort filtering to increase performace.
// NOTE: storage.Interface is ultimatly responsible for the final filtering
Watch(*WatchRequest, ResourceStore_WatchServer) error
Watch(*WatchRequest, grpc.ServerStreamingServer[WatchEvent]) error
}
// UnimplementedResourceStoreServer should be embedded to have forward compatible implementations.
type UnimplementedResourceStoreServer struct {
}
// UnimplementedResourceStoreServer should be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedResourceStoreServer struct{}
func (UnimplementedResourceStoreServer) Read(context.Context, *ReadRequest) (*ReadResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Read not implemented")
@ -180,12 +182,16 @@ func (UnimplementedResourceStoreServer) Update(context.Context, *UpdateRequest)
func (UnimplementedResourceStoreServer) Delete(context.Context, *DeleteRequest) (*DeleteResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Delete not implemented")
}
func (UnimplementedResourceStoreServer) Restore(context.Context, *RestoreRequest) (*RestoreResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Restore not implemented")
}
func (UnimplementedResourceStoreServer) List(context.Context, *ListRequest) (*ListResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method List not implemented")
}
func (UnimplementedResourceStoreServer) Watch(*WatchRequest, ResourceStore_WatchServer) error {
func (UnimplementedResourceStoreServer) Watch(*WatchRequest, grpc.ServerStreamingServer[WatchEvent]) error {
return status.Errorf(codes.Unimplemented, "method Watch not implemented")
}
func (UnimplementedResourceStoreServer) testEmbeddedByValue() {}
// UnsafeResourceStoreServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to ResourceStoreServer will
@ -195,6 +201,13 @@ type UnsafeResourceStoreServer interface {
}
func RegisterResourceStoreServer(s grpc.ServiceRegistrar, srv ResourceStoreServer) {
// If the following call pancis, it indicates UnimplementedResourceStoreServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&ResourceStore_ServiceDesc, srv)
}
@ -270,6 +283,24 @@ func _ResourceStore_Delete_Handler(srv interface{}, ctx context.Context, dec fun
return interceptor(ctx, in, info, handler)
}
func _ResourceStore_Restore_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RestoreRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ResourceStoreServer).Restore(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ResourceStore_Restore_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ResourceStoreServer).Restore(ctx, req.(*RestoreRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ResourceStore_List_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListRequest)
if err := dec(in); err != nil {
@ -293,21 +324,11 @@ func _ResourceStore_Watch_Handler(srv interface{}, stream grpc.ServerStream) err
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(ResourceStoreServer).Watch(m, &resourceStoreWatchServer{ServerStream: stream})
return srv.(ResourceStoreServer).Watch(m, &grpc.GenericServerStream[WatchRequest, WatchEvent]{ServerStream: stream})
}
type ResourceStore_WatchServer interface {
Send(*WatchEvent) error
grpc.ServerStream
}
type resourceStoreWatchServer struct {
grpc.ServerStream
}
func (x *resourceStoreWatchServer) Send(m *WatchEvent) error {
return x.ServerStream.SendMsg(m)
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type ResourceStore_WatchServer = grpc.ServerStreamingServer[WatchEvent]
// ResourceStore_ServiceDesc is the grpc.ServiceDesc for ResourceStore service.
// It's only intended for direct use with grpc.RegisterService,
@ -332,6 +353,10 @@ var ResourceStore_ServiceDesc = grpc.ServiceDesc{
MethodName: "Delete",
Handler: _ResourceStore_Delete_Handler,
},
{
MethodName: "Restore",
Handler: _ResourceStore_Restore_Handler,
},
{
MethodName: "List",
Handler: _ResourceStore_List_Handler,
@ -420,7 +445,7 @@ func (c *resourceIndexClient) Origin(ctx context.Context, in *OriginRequest, opt
// ResourceIndexServer is the server API for ResourceIndex service.
// All implementations should embed UnimplementedResourceIndexServer
// for forward compatibility
// for forward compatibility.
//
// Unlike the ResourceStore, this service can be exposed to clients directly
// It should be implemented with efficient indexes and does not need read-after-write semantics
@ -434,9 +459,12 @@ type ResourceIndexServer interface {
Origin(context.Context, *OriginRequest) (*OriginResponse, error)
}
// UnimplementedResourceIndexServer should be embedded to have forward compatible implementations.
type UnimplementedResourceIndexServer struct {
}
// UnimplementedResourceIndexServer should be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedResourceIndexServer struct{}
func (UnimplementedResourceIndexServer) Search(context.Context, *ResourceSearchRequest) (*ResourceSearchResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Search not implemented")
@ -450,6 +478,7 @@ func (UnimplementedResourceIndexServer) History(context.Context, *HistoryRequest
func (UnimplementedResourceIndexServer) Origin(context.Context, *OriginRequest) (*OriginResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Origin not implemented")
}
func (UnimplementedResourceIndexServer) testEmbeddedByValue() {}
// UnsafeResourceIndexServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to ResourceIndexServer will
@ -459,6 +488,13 @@ type UnsafeResourceIndexServer interface {
}
func RegisterResourceIndexServer(s grpc.ServiceRegistrar, srv ResourceIndexServer) {
// If the following call pancis, it indicates UnimplementedResourceIndexServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&ResourceIndex_ServiceDesc, srv)
}
@ -608,7 +644,7 @@ func (c *blobStoreClient) GetBlob(ctx context.Context, in *GetBlobRequest, opts
// BlobStoreServer is the server API for BlobStore service.
// All implementations should embed UnimplementedBlobStoreServer
// for forward compatibility
// for forward compatibility.
type BlobStoreServer interface {
// Upload a blob that will be saved in a resource
PutBlob(context.Context, *PutBlobRequest) (*PutBlobResponse, error)
@ -617,9 +653,12 @@ type BlobStoreServer interface {
GetBlob(context.Context, *GetBlobRequest) (*GetBlobResponse, error)
}
// UnimplementedBlobStoreServer should be embedded to have forward compatible implementations.
type UnimplementedBlobStoreServer struct {
}
// UnimplementedBlobStoreServer should be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedBlobStoreServer struct{}
func (UnimplementedBlobStoreServer) PutBlob(context.Context, *PutBlobRequest) (*PutBlobResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method PutBlob not implemented")
@ -627,6 +666,7 @@ func (UnimplementedBlobStoreServer) PutBlob(context.Context, *PutBlobRequest) (*
func (UnimplementedBlobStoreServer) GetBlob(context.Context, *GetBlobRequest) (*GetBlobResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetBlob not implemented")
}
func (UnimplementedBlobStoreServer) testEmbeddedByValue() {}
// UnsafeBlobStoreServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to BlobStoreServer will
@ -636,6 +676,13 @@ type UnsafeBlobStoreServer interface {
}
func RegisterBlobStoreServer(s grpc.ServiceRegistrar, srv BlobStoreServer) {
// If the following call pancis, it indicates UnimplementedBlobStoreServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&BlobStore_ServiceDesc, srv)
}
@ -730,7 +777,7 @@ func (c *diagnosticsClient) IsHealthy(ctx context.Context, in *HealthCheckReques
// DiagnosticsServer is the server API for Diagnostics service.
// All implementations should embed UnimplementedDiagnosticsServer
// for forward compatibility
// for forward compatibility.
//
// Clients can use this service directly
// NOTE: This is read only, and no read afer write guarantees
@ -739,13 +786,17 @@ type DiagnosticsServer interface {
IsHealthy(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error)
}
// UnimplementedDiagnosticsServer should be embedded to have forward compatible implementations.
type UnimplementedDiagnosticsServer struct {
}
// UnimplementedDiagnosticsServer should be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedDiagnosticsServer struct{}
func (UnimplementedDiagnosticsServer) IsHealthy(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method IsHealthy not implemented")
}
func (UnimplementedDiagnosticsServer) testEmbeddedByValue() {}
// UnsafeDiagnosticsServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to DiagnosticsServer will
@ -755,6 +806,13 @@ type UnsafeDiagnosticsServer interface {
}
func RegisterDiagnosticsServer(s grpc.ServiceRegistrar, srv DiagnosticsServer) {
// If the following call pancis, it indicates UnimplementedDiagnosticsServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&Diagnostics_ServiceDesc, srv)
}

View File

@ -10,12 +10,14 @@ import (
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"github.com/grafana/authlib/authz"
"github.com/grafana/authlib/claims"
@ -378,7 +380,6 @@ func (s *server) newEvent(ctx context.Context, user claims.AuthInfo, key *Resour
if oldValue == nil {
event.Type = WatchEvent_ADDED
} else {
event.Type = WatchEvent_MODIFIED
check.Verb = "update"
temp := &unstructured.Unstructured{}
@ -390,6 +391,13 @@ func (s *server) newEvent(ctx context.Context, user claims.AuthInfo, key *Resour
if err != nil {
return nil, AsErrorResult(err)
}
// restores will restore with a different k8s uid
if event.ObjectOld.GetUID() != obj.GetUID() {
event.Type = WatchEvent_ADDED
} else {
event.Type = WatchEvent_MODIFIED
}
}
if key.Namespace != obj.GetNamespace() {
@ -749,6 +757,125 @@ func (s *server) List(ctx context.Context, req *ListRequest) (*ListResponse, err
return rsp, err
}
func (s *server) Restore(ctx context.Context, req *RestoreRequest) (*RestoreResponse, error) {
ctx, span := s.tracer.Start(ctx, "storage_server.List")
defer span.End()
// check that the user has access
user, ok := claims.From(ctx)
if !ok || user == nil {
return &RestoreResponse{
Error: &ErrorResult{
Message: "no user found in context",
Code: http.StatusUnauthorized,
}}, nil
}
if err := s.Init(ctx); err != nil {
return nil, err
}
checker, err := s.access.Compile(ctx, user, authz.ListRequest{
Group: req.Key.Group,
Resource: req.Key.Resource,
Namespace: req.Key.Namespace,
})
if err != nil {
return &RestoreResponse{Error: AsErrorResult(err)}, nil
}
if checker == nil {
return &RestoreResponse{Error: &ErrorResult{
Code: http.StatusForbidden,
}}, nil
}
// get the asked for resource version to restore
readRsp, err := s.Read(ctx, &ReadRequest{
Key: req.Key,
ResourceVersion: req.ResourceVersion,
IncludeDeleted: true,
})
if err != nil || readRsp == nil || readRsp.Error != nil {
return &RestoreResponse{
Error: &ErrorResult{
Code: http.StatusNotFound,
Message: fmt.Sprintf("could not find old resource: %s", readRsp.Error.Message),
},
}, nil
}
// generate a new k8s UID when restoring. The name will remain the same
// (for dashboards, this will be the dashboard uid), but since controllers
// will see this as a create event, we do not want the same k8s UID, or
// there may be unintended behavior
newUid := types.UID(uuid.NewString())
tmp := &unstructured.Unstructured{}
err = tmp.UnmarshalJSON(readRsp.Value)
if err != nil {
return &RestoreResponse{
Error: &ErrorResult{
Code: http.StatusNotFound,
Message: fmt.Sprintf("could not unmarhsal: %s", err.Error()),
},
}, nil
}
obj, err := utils.MetaAccessor(tmp)
if err != nil {
return &RestoreResponse{
Error: &ErrorResult{
Code: http.StatusNotFound,
Message: fmt.Sprintf("could not get object: %s", err.Error()),
},
}, nil
}
obj.SetUID(newUid)
rtObj, ok := obj.GetRuntimeObject()
if !ok {
return &RestoreResponse{
Error: &ErrorResult{
Code: http.StatusNotFound,
Message: "could not get runtime object",
},
}, nil
}
newObj, err := json.Marshal(rtObj)
if err != nil {
return &RestoreResponse{
Error: &ErrorResult{
Code: http.StatusNotFound,
Message: fmt.Sprintf("could not marshal object: %s", err.Error()),
},
}, nil
}
// finally, send to the backend to create & update the history of the restored object
event, errRes := s.newEvent(ctx, user, req.Key, newObj, readRsp.Value)
if errRes != nil {
return &RestoreResponse{
Error: &ErrorResult{
Code: http.StatusInternalServerError,
Message: fmt.Sprintf("could not create restore resource event: %s", errRes.Message),
},
}, nil
}
rv, err := s.backend.WriteEvent(ctx, *event)
if err != nil {
return &RestoreResponse{
Error: &ErrorResult{
Code: http.StatusInternalServerError,
Message: fmt.Sprintf("could not restore resource: %s", err.Error()),
},
}, nil
}
return &RestoreResponse{
Error: nil,
ResourceVersion: rv,
}, nil
}
func (s *server) initWatcher() error {
var err error
s.broadcaster, err = NewBroadcaster(s.ctx, func(out chan<- *WrittenEvent) error {

View File

@ -220,4 +220,79 @@ func TestSimpleServer(t *testing.T) {
ResourceVersion: created.ResourceVersion})
require.ErrorIs(t, err, ErrOptimisticLockingFailed)
})
t.Run("playlist restore", func(t *testing.T) {
uid := "zzz"
raw := []byte(`{
"apiVersion": "playlist.grafana.app/v0alpha1",
"kind": "Playlist",
"metadata": {
"name": "fdgsv37qslr0ga",
"namespace": "default",
"uid": "` + uid + `",
"annotations": {
"grafana.app/repoName": "elsewhere",
"grafana.app/repoPath": "path/to/item",
"grafana.app/repoTimestamp": "2024-02-02T00:00:00Z"
}
},
"spec": {
"title": "hello",
"interval": "5m",
"items": [
{
"type": "dashboard_by_uid",
"value": "vmie2cmWz"
}
]
}
}`)
key := &ResourceKey{
Group: "playlist.grafana.app",
Resource: "rrrr",
Namespace: "default",
Name: "fdgsv37qslr0ga",
}
// create
created, err := server.Create(ctx, &CreateRequest{
Value: raw,
Key: key,
})
require.NoError(t, err)
// make sure it exists
found, err := server.Read(ctx, &ReadRequest{Key: key})
require.NoError(t, err)
require.Nil(t, found.Error)
fmt.Println(found.ResourceVersion)
// delete it
deleted, err := server.Delete(ctx, &DeleteRequest{Key: key, ResourceVersion: created.ResourceVersion})
require.NoError(t, err)
require.True(t, deleted.ResourceVersion > created.ResourceVersion)
// restore it
restored, err := server.Restore(ctx, &RestoreRequest{
Key: key,
ResourceVersion: found.ResourceVersion,
})
require.NoError(t, err)
require.Nil(t, restored.Error)
require.True(t, restored.ResourceVersion > deleted.ResourceVersion)
// ensure it exists now
found, err = server.Read(ctx, &ReadRequest{Key: key})
require.NoError(t, err)
require.Nil(t, found.Error)
require.Equal(t, restored.ResourceVersion, found.ResourceVersion)
foundUnstructured := &unstructured.Unstructured{}
err = foundUnstructured.UnmarshalJSON(found.Value)
require.NoError(t, err)
foundObj, err := utils.MetaAccessor(foundUnstructured)
require.NoError(t, err)
// the UID should be different now
require.NotEqual(t, uid, string(foundObj.GetUID()))
})
}

View File

@ -160,6 +160,9 @@ func (b *backend) WriteEvent(ctx context.Context, event resource.WriteEvent) (in
// TODO: validate key ?
switch event.Type {
case resource.WatchEvent_ADDED:
if event.ObjectOld != nil {
return b.restore(ctx, event)
}
return b.create(ctx, event)
case resource.WatchEvent_MODIFIED:
return b.update(ctx, event)
@ -349,6 +352,83 @@ func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64,
return newVersion, err
}
func (b *backend) restore(ctx context.Context, event resource.WriteEvent) (int64, error) {
ctx, span := b.tracer.Start(ctx, tracePrefix+"Restore")
defer span.End()
var newVersion int64
guid := uuid.New().String()
err := b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
folder := ""
if event.Object != nil {
folder = event.Object.GetFolder()
}
// 1. Re-create resource
// Note: we may want to replace the write event with a create event, tbd.
if _, err := dbutil.Exec(ctx, tx, sqlResourceInsert, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.dialect),
WriteEvent: event,
Folder: folder,
GUID: guid,
}); err != nil {
return fmt.Errorf("insert into resource: %w", err)
}
// 2. Insert into resource history
if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.dialect),
WriteEvent: event,
Folder: folder,
GUID: guid,
}); err != nil {
return fmt.Errorf("insert into resource history: %w", err)
}
// 3. TODO: Rebuild the whole folder tree structure if we're creating a folder
// 4. Atomically increment resource version for this kind
rv, err := b.resourceVersionAtomicInc(ctx, tx, event.Key)
if err != nil {
return fmt.Errorf("increment resource version: %w", err)
}
// 5. Update the RV in both resource and resource_history
if _, err = dbutil.Exec(ctx, tx, sqlResourceHistoryUpdateRV, sqlResourceUpdateRVRequest{
SQLTemplate: sqltemplate.New(b.dialect),
GUID: guid,
ResourceVersion: rv,
}); err != nil {
return fmt.Errorf("update history rv: %w", err)
}
if _, err = dbutil.Exec(ctx, tx, sqlResourceUpdateRV, sqlResourceUpdateRVRequest{
SQLTemplate: sqltemplate.New(b.dialect),
GUID: guid,
ResourceVersion: rv,
}); err != nil {
return fmt.Errorf("update resource rv: %w", err)
}
// 6. Update all resource history entries with the new UID
// Note: we do not update any history entries that have a deletion timestamp included. This will become
// important once we start using finalizers, as the initial delete will show up as an update with a deletion timestamp included.
if _, err = dbutil.Exec(ctx, tx, sqlResoureceHistoryUpdateUid, sqlResourceHistoryUpdateRequest{
SQLTemplate: sqltemplate.New(b.dialect),
WriteEvent: event,
OldUID: string(event.ObjectOld.GetUID()),
NewUID: string(event.Object.GetUID()),
}); err != nil {
return fmt.Errorf("update history uid: %w", err)
}
newVersion = rv
return nil
})
return newVersion, err
}
func (b *backend) ReadResource(ctx context.Context, req *resource.ReadRequest) *resource.BackendReadResponse {
_, span := b.tracer.Start(ctx, tracePrefix+".Read")
defer span.End()
@ -371,8 +451,20 @@ func (b *backend) ReadResource(ctx context.Context, req *resource.ReadRequest) *
err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
var err error
res, err = dbutil.QueryRow(ctx, tx, sr, readReq)
// if not found, look for latest deleted version (if requested)
if errors.Is(err, sql.ErrNoRows) && req.IncludeDeleted {
sr = sqlResourceHistoryRead
readReq2 := &sqlResourceReadRequest{
SQLTemplate: sqltemplate.New(b.dialect),
Request: req,
Response: NewReadResponse(),
}
res, err = dbutil.QueryRow(ctx, tx, sr, readReq2)
return err
}
return err
})
if errors.Is(err, sql.ErrNoRows) {
return &resource.BackendReadResponse{
Error: resource.NewNotFoundError(req.Key),

View File

@ -592,3 +592,140 @@ func TestBackend_delete(t *testing.T) {
require.ErrorContains(t, err, "update history rv")
})
}
func TestBackend_restore(t *testing.T) {
t.Parallel()
meta, err := utils.MetaAccessor(&unstructured.Unstructured{
Object: map[string]any{},
})
require.NoError(t, err)
meta.SetUID("new-uid")
oldMeta, err := utils.MetaAccessor(&unstructured.Unstructured{
Object: map[string]any{},
})
require.NoError(t, err)
oldMeta.SetUID("old-uid")
event := resource.WriteEvent{
Type: resource.WatchEvent_ADDED,
Key: resKey,
Object: meta,
ObjectOld: oldMeta,
}
t.Run("happy path", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.SQLMock.ExpectBegin()
b.ExecWithResult("insert resource", 0, 1)
b.ExecWithResult("insert resource_history", 0, 1)
expectSuccessfulResourceVersionAtomicInc(t, b)
b.ExecWithResult("update resource_history", 0, 1)
b.ExecWithResult("update resource", 0, 1)
b.ExecWithResult("update resource_history", 0, 1)
b.SQLMock.ExpectCommit()
v, err := b.restore(ctx, event)
require.NoError(t, err)
require.Equal(t, int64(23456), v)
})
t.Run("error restoring resource", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.SQLMock.ExpectBegin()
b.ExecWithErr("insert resource", errTest)
b.SQLMock.ExpectRollback()
v, err := b.restore(ctx, event)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "insert into resource:")
})
t.Run("error inserting into resource history", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.SQLMock.ExpectBegin()
b.ExecWithResult("insert resource", 0, 1)
b.ExecWithErr("insert resource_history", errTest)
b.SQLMock.ExpectRollback()
v, err := b.restore(ctx, event)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "insert into resource history")
})
t.Run("error incrementing resource version", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.SQLMock.ExpectBegin()
b.ExecWithResult("insert resource", 0, 1)
b.ExecWithResult("insert resource_history", 0, 1)
expectUnsuccessfulResourceVersionAtomicInc(t, b, errTest)
b.SQLMock.ExpectRollback()
v, err := b.restore(ctx, event)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "increment resource version")
})
t.Run("error updating resource history", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.SQLMock.ExpectBegin()
b.ExecWithResult("insert resource", 0, 1)
b.ExecWithResult("insert resource_history", 0, 1)
expectSuccessfulResourceVersionAtomicInc(t, b)
b.ExecWithErr("update resource_history", errTest)
b.SQLMock.ExpectRollback()
v, err := b.restore(ctx, event)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "update history rv")
})
t.Run("error updating resource", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.SQLMock.ExpectBegin()
b.ExecWithResult("insert resource", 0, 1)
b.ExecWithResult("insert resource_history", 0, 1)
expectSuccessfulResourceVersionAtomicInc(t, b)
b.ExecWithResult("update resource_history", 0, 1)
b.ExecWithErr("update resource", errTest)
b.SQLMock.ExpectRollback()
v, err := b.restore(ctx, event)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "update resource rv")
})
t.Run("error updating resource history uid", func(t *testing.T) {
t.Parallel()
b, ctx := setupBackendTest(t)
b.SQLMock.ExpectBegin()
b.ExecWithResult("insert resource", 0, 1)
b.ExecWithResult("insert resource_history", 0, 1)
expectSuccessfulResourceVersionAtomicInc(t, b)
b.ExecWithResult("update resource_history", 0, 1)
b.ExecWithResult("update resource", 0, 1)
b.ExecWithErr("update resource_history", errTest)
b.SQLMock.ExpectRollback()
v, err := b.restore(ctx, event)
require.Zero(t, v)
require.Error(t, err)
require.ErrorContains(t, err, "update history uid")
})
}

View File

@ -14,8 +14,12 @@ SELECT
AND {{ .Ident "group" }} = {{ .Arg .Request.Key.Group }}
AND {{ .Ident "resource" }} = {{ .Arg .Request.Key.Resource }}
AND {{ .Ident "name" }} = {{ .Arg .Request.Key.Name }}
{{ if .Request.IncludeDeleted }}
AND {{ .Ident "action" }} != 3
AND {{ .Ident "value" }} NOT LIKE '%deletionTimestamp%'
{{ end }}
{{ if gt .Request.ResourceVersion 0 }}
AND {{ .Ident "resource_version" }} <= {{ .Arg .Request.ResourceVersion }}
AND {{ .Ident "resource_version" }} {{ if .Request.IncludeDeleted }}={{ else }}<={{ end }} {{ .Arg .Request.ResourceVersion }}
{{ end }}
ORDER BY {{ .Ident "resource_version" }} DESC
LIMIT 1

View File

@ -0,0 +1,8 @@
UPDATE {{ .Ident "resource_history" }}
SET {{ .Ident "value" }} = REPLACE({{ .Ident "value" }}, CONCAT('"uid":"', {{ .Arg .OldUID }}, '"'), CONCAT('"uid":"', {{ .Arg .NewUID }}, '"'))
WHERE {{ .Ident "name" }} = {{ .Arg .WriteEvent.Key.Name }}
AND {{ .Ident "namespace" }} = {{ .Arg .WriteEvent.Key.Namespace }}
AND {{ .Ident "group" }} = {{ .Arg .WriteEvent.Key.Group }}
AND {{ .Ident "resource" }} = {{ .Arg .WriteEvent.Key.Resource }}
AND {{ .Ident "action" }} != 3
AND {{ .Ident "value" }} NOT LIKE '%deletionTimestamp%';

View File

@ -37,6 +37,7 @@ var (
sqlResourceUpdateRV = mustTemplate("resource_update_rv.sql")
sqlResourceHistoryRead = mustTemplate("resource_history_read.sql")
sqlResourceHistoryUpdateRV = mustTemplate("resource_history_update_rv.sql")
sqlResoureceHistoryUpdateUid = mustTemplate("resource_history_update_uid.sql")
sqlResourceHistoryInsert = mustTemplate("resource_history_insert.sql")
sqlResourceHistoryPoll = mustTemplate("resource_history_poll.sql")
@ -191,6 +192,19 @@ func (r sqlResourceHistoryListRequest) Results() (*resource.ResourceWrapper, err
}, nil
}
// update resource history
type sqlResourceHistoryUpdateRequest struct {
sqltemplate.SQLTemplate
WriteEvent resource.WriteEvent
OldUID string
NewUID string
}
func (r sqlResourceHistoryUpdateRequest) Validate() error {
return nil // TODO
}
// update RV
type sqlResourceUpdateRVRequest struct {

View File

@ -166,6 +166,26 @@ func TestUnifiedStorageQueries(t *testing.T) {
},
},
sqlResoureceHistoryUpdateUid: {
{
Name: "modify uids in history",
Data: &sqlResourceHistoryUpdateRequest{
SQLTemplate: mocks.NewTestingSQLTemplate(),
WriteEvent: resource.WriteEvent{
Key: &resource.ResourceKey{
Namespace: "nn",
Group: "gg",
Resource: "rr",
Name: "name",
},
PreviousRV: 1234,
},
OldUID: "old-uid",
NewUID: "new-uid",
},
},
},
sqlResourceHistoryInsert: {
{
Name: "insert into resource_history",

View File

@ -0,0 +1,8 @@
UPDATE `resource_history`
SET `value` = REPLACE(`value`, CONCAT('"uid":"', 'old-uid', '"'), CONCAT('"uid":"', 'new-uid', '"'))
WHERE `name` = 'name'
AND `namespace` = 'nn'
AND `group` = 'gg'
AND `resource` = 'rr'
AND `action` != 3
AND `value` NOT LIKE '%deletionTimestamp%';

View File

@ -0,0 +1,8 @@
UPDATE "resource_history"
SET "value" = REPLACE("value", CONCAT('"uid":"', 'old-uid', '"'), CONCAT('"uid":"', 'new-uid', '"'))
WHERE "name" = 'name'
AND "namespace" = 'nn'
AND "group" = 'gg'
AND "resource" = 'rr'
AND "action" != 3
AND "value" NOT LIKE '%deletionTimestamp%';

View File

@ -0,0 +1,8 @@
UPDATE "resource_history"
SET "value" = REPLACE("value", CONCAT('"uid":"', 'old-uid', '"'), CONCAT('"uid":"', 'new-uid', '"'))
WHERE "name" = 'name'
AND "namespace" = 'nn'
AND "group" = 'gg'
AND "resource" = 'rr'
AND "action" != 3
AND "value" NOT LIKE '%deletionTimestamp%';