K8s: Add storage dual writer (#75403)

This commit is contained in:
Todd Treece
2023-09-26 17:15:15 -04:00
committed by GitHub
parent ebec452f9f
commit bb9e66e671
19 changed files with 553 additions and 195 deletions

View File

@@ -0,0 +1,43 @@
# Grafana Kubernetes compatible API Server
## Basic Setup
```ini
app_mode = development
[feature_toggles]
grafanaAPIServer = true
```
Start Grafana:
```bash
make run
```
## Enable dual write to `etcd`
Start `etcd`:
```bash
make devenv sources=etcd
```
Enable dual write to `etcd`:
```ini
[grafana-apiserver]
etcd_servers = 127.0.0.1:2379
```
### `kubectl` access
From the root of the repository:
```bash
export KUBECONFIG=$PWD/data/k8s/grafana.kubeconfig
kubectl api-resources
```
### Grafana API Access
The Kubernetes compatible API can be accessed using existing Grafana AuthN at: [http://localhost:3000/k8s/apis/](http://localhost:3000/k8s/apis/).

View File

@@ -1,12 +1,9 @@
package grafanaapiserver
import (
"fmt"
"strconv"
"strings"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/registry/generic"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/spec3"
@@ -22,7 +19,8 @@ type APIGroupBuilder interface {
GetAPIGroupInfo(
scheme *runtime.Scheme,
codecs serializer.CodecFactory, // pointer?
) *genericapiserver.APIGroupInfo
optsGetter generic.RESTOptionsGetter,
) (*genericapiserver.APIGroupInfo, error)
// Get OpenAPI definitions
GetOpenAPIDefinitions() common.GetOpenAPIDefinitions
@@ -30,34 +28,3 @@ type APIGroupBuilder interface {
// Register additional routes with the server
GetOpenAPIPostProcessor() func(*spec3.OpenAPI) (*spec3.OpenAPI, error)
}
func OrgIdToNamespace(orgId int64) string {
if orgId > 1 {
return fmt.Sprintf("org-%d", orgId)
}
return "default"
}
func NamespaceToOrgID(ns string) (int64, error) {
parts := strings.Split(ns, "-")
switch len(parts) {
case 1:
if parts[0] == "default" {
return 1, nil
}
if parts[0] == "" {
return 0, nil // no orgId, cluster scope
}
return 0, fmt.Errorf("invalid namespace (expected default)")
case 2:
if !(parts[0] == "org" || parts[0] == "tenant") {
return 0, fmt.Errorf("invalid namespace (org|tenant)")
}
n, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return 0, fmt.Errorf("invalid namepscae (%w)", err)
}
return n, nil
}
return 0, fmt.Errorf("invalid namespace (%d parts)", len(parts))
}

View File

@@ -0,0 +1,22 @@
package request
import (
"context"
"strconv"
"k8s.io/apiserver/pkg/endpoints/request"
)
func OrgIDFrom(ctx context.Context) (int64, bool) {
ns := request.NamespaceValue(ctx)
if len(ns) < 5 || ns[:4] != "org-" {
return 0, false
}
orgID, err := strconv.Atoi(ns[4:])
if err != nil {
return 0, false
}
return int64(orgID), true
}

View File

@@ -0,0 +1,62 @@
package request_test
import (
"context"
"testing"
"k8s.io/apiserver/pkg/endpoints/request"
grafanarequest "github.com/grafana/grafana/pkg/services/grafana-apiserver/endpoints/request"
)
func TestOrgIDFrom(t *testing.T) {
tests := []struct {
name string
ctx context.Context
expected int64
ok bool
}{
{
name: "empty namespace",
ctx: context.Background(),
expected: 0,
ok: false,
},
{
name: "incorrect number of parts",
ctx: request.WithNamespace(context.Background(), "org-123-a"),
expected: 0,
ok: false,
},
{
name: "incorrect prefix",
ctx: request.WithNamespace(context.Background(), "abc-123"),
expected: 0,
ok: false,
},
{
name: "org id not a number",
ctx: request.WithNamespace(context.Background(), "org-invalid"),
expected: 0,
ok: false,
},
{
name: "valid org id",
ctx: request.WithNamespace(context.Background(), "org-123"),
expected: 123,
ok: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual, ok := grafanarequest.OrgIDFrom(tt.ctx)
if actual != tt.expected {
t.Errorf("OrgIDFrom() returned %d, expected %d", actual, tt.expected)
}
if ok != tt.ok {
t.Errorf("OrgIDFrom() returned %t, expected %t", ok, tt.ok)
}
})
}
}

View File

@@ -0,0 +1,79 @@
package generic
import (
"context"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/names"
)
type genericStrategy struct {
runtime.ObjectTyper
names.NameGenerator
}
// NewStrategy creates and returns a genericStrategy instance.
func NewStrategy(typer runtime.ObjectTyper) genericStrategy {
return genericStrategy{typer, names.SimpleNameGenerator}
}
// NamespaceScoped returns true because all Generic resources must be within a namespace.
func (genericStrategy) NamespaceScoped() bool {
return true
}
func (genericStrategy) PrepareForCreate(ctx context.Context, obj runtime.Object) {}
func (genericStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object) {}
func (genericStrategy) Validate(ctx context.Context, obj runtime.Object) field.ErrorList {
return field.ErrorList{}
}
// WarningsOnCreate returns warnings for the creation of the given object.
func (genericStrategy) WarningsOnCreate(ctx context.Context, obj runtime.Object) []string { return nil }
func (genericStrategy) AllowCreateOnUpdate() bool {
return false
}
func (genericStrategy) AllowUnconditionalUpdate() bool {
return false
}
func (genericStrategy) Canonicalize(obj runtime.Object) {}
func (genericStrategy) ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList {
return field.ErrorList{}
}
// WarningsOnUpdate returns warnings for the given update.
func (genericStrategy) WarningsOnUpdate(ctx context.Context, obj, old runtime.Object) []string {
return nil
}
// GetAttrs returns labels and fields of an object.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return nil, nil, err
}
fieldsSet := fields.Set{
"metadata.name": accessor.GetName(),
}
return labels.Set(accessor.GetLabels()), fieldsSet, nil
}
// Matcher returns a generic.SelectionPredicate that matches on label and field selectors.
func Matcher(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
return storage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@@ -0,0 +1,113 @@
package rest
import (
"context"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/rest"
)
var (
_ rest.Storage = (*DualWriter)(nil)
_ rest.Scoper = (*DualWriter)(nil)
_ rest.TableConvertor = (*DualWriter)(nil)
_ rest.CreaterUpdater = (*DualWriter)(nil)
_ rest.CollectionDeleter = (*DualWriter)(nil)
_ rest.GracefulDeleter = (*DualWriter)(nil)
_ rest.SingularNameProvider = (*DualWriter)(nil)
)
// Storage is a storage implementation that satisfies the same interfaces as genericregistry.Store.
type Storage interface {
rest.Storage
rest.StandardStorage
rest.Scoper
rest.TableConvertor
rest.SingularNameProvider
}
// LegacyStorage is a storage implementation that writes to the Grafana SQL database.
type LegacyStorage interface {
rest.Storage
rest.Scoper
rest.SingularNameProvider
rest.TableConvertor
}
// DualWriter is a storage implementation that writes first to LegacyStorage and then to Storage.
// If writing to LegacyStorage fails, the write to Storage is skipped and the error is returned.
// Storage is used for all read operations.
//
// The LegacyStorage implementation must implement the following interfaces:
// - rest.Storage
// - rest.TableConvertor
// - rest.Scoper
// - rest.SingularNameProvider
//
// These interfaces are optional, but they all should be implemented to fully support dual writes:
// - rest.Creater
// - rest.Updater
// - rest.GracefulDeleter
// - rest.CollectionDeleter
type DualWriter struct {
Storage
legacy LegacyStorage
}
// NewDualWriter returns a new DualWriter.
func NewDualWriter(legacy LegacyStorage, storage Storage) *DualWriter {
return &DualWriter{
Storage: storage,
legacy: legacy,
}
}
// Create overrides the default behavior of the Storage and writes to both the LegacyStorage and Storage.
func (d *DualWriter) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
if legacy, ok := d.legacy.(rest.Creater); ok {
_, err := legacy.Create(ctx, obj, createValidation, options)
if err != nil {
return nil, err
}
}
return d.Storage.Create(ctx, obj, createValidation, options)
}
// Update overrides the default behavior of the Storage and writes to both the LegacyStorage and Storage.
func (d *DualWriter) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
if legacy, ok := d.legacy.(rest.Updater); ok {
_, _, err := legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
if err != nil {
return nil, false, err
}
}
return d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
}
// Delete overrides the default behavior of the Storage and delete from both the LegacyStorage and Storage.
func (d *DualWriter) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
if legacy, ok := d.legacy.(rest.GracefulDeleter); ok {
_, _, err := legacy.Delete(ctx, name, deleteValidation, options)
if err != nil {
return nil, false, err
}
}
return d.Storage.Delete(ctx, name, deleteValidation, options)
}
// DeleteCollection overrides the default behavior of the Storage and delete from both the LegacyStorage and Storage.
func (d *DualWriter) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {
if legacy, ok := d.legacy.(rest.CollectionDeleter); ok {
_, err := legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
if err != nil {
return nil, err
}
}
return d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
}

View File

@@ -90,7 +90,8 @@ type RestConfigProvider interface {
type service struct {
*services.BasicService
restConfig *clientrest.Config
restConfig *clientrest.Config
etcd_servers []string
enabled bool
dataPath string
@@ -102,15 +103,17 @@ type service struct {
builders []APIGroupBuilder
}
func ProvideService(cfg *setting.Cfg,
func ProvideService(
cfg *setting.Cfg,
rr routing.RouteRegister,
) (*service, error) {
s := &service{
enabled: cfg.IsFeatureToggleEnabled(featuremgmt.FlagGrafanaAPIServer),
rr: rr,
dataPath: path.Join(cfg.DataPath, "k8s"),
stopCh: make(chan struct{}),
builders: []APIGroupBuilder{},
etcd_servers: cfg.SectionWithEnvOverrides("grafana-apiserver").Key("etcd_servers").Strings(","),
enabled: cfg.IsFeatureToggleEnabled(featuremgmt.FlagGrafanaAPIServer),
rr: rr,
dataPath: path.Join(cfg.DataPath, "k8s"),
stopCh: make(chan struct{}),
builders: []APIGroupBuilder{},
}
// This will be used when running as a dskit service
@@ -170,9 +173,13 @@ func (s *service) start(ctx context.Context) error {
o.Authorization.RemoteKubeConfigFileOptional = true
o.Authorization.AlwaysAllowPaths = []string{"*"}
o.Authorization.AlwaysAllowGroups = []string{user.SystemPrivilegedGroup, "grafana"}
o.Etcd = nil
o.Etcd.StorageConfig.Transport.ServerList = s.etcd_servers
o.Admission = nil
o.CoreAPI = nil
if len(o.Etcd.StorageConfig.Transport.ServerList) == 0 {
o.Etcd = nil
}
// Get the util to get the paths to pre-generated certs
certUtil := certgenerator.CertUtil{
@@ -246,7 +253,11 @@ func (s *service) start(ctx context.Context) error {
// Install the API Group+version
for _, b := range builders {
err = server.InstallAPIGroup(b.GetAPIGroupInfo(Scheme, Codecs))
g, err := b.GetAPIGroupInfo(Scheme, Codecs, serverConfig.RESTOptionsGetter)
if err != nil {
return err
}
err = server.InstallAPIGroup(g)
if err != nil {
return err
}