K8s: Add apimachinery and apiserver packages (#83190)

This commit is contained in:
Todd Treece
2024-02-23 15:15:43 -05:00
committed by GitHub
parent f4b432841b
commit e5a26a3f7c
84 changed files with 701 additions and 1020 deletions

View File

@@ -1,69 +0,0 @@
package builder
import (
"net/http"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/registry/generic"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/spec3"
)
// TODO: this (or something like it) belongs in grafana-app-sdk,
// but lets keep it here while we iterate on a few simple examples
type APIGroupBuilder interface {
// Get the main group name
GetGroupVersion() schema.GroupVersion
// Add the kinds to the server scheme
InstallSchema(scheme *runtime.Scheme) error
// Build the group+version behavior
GetAPIGroupInfo(
scheme *runtime.Scheme,
codecs serializer.CodecFactory,
optsGetter generic.RESTOptionsGetter,
dualWrite bool,
) (*genericapiserver.APIGroupInfo, error)
// Get OpenAPI definitions
GetOpenAPIDefinitions() common.GetOpenAPIDefinitions
// Get the API routes for each version
GetAPIRoutes() *APIRoutes
// Optionally add an authorization hook
// Standard namespace checking will happen before this is called, specifically
// the namespace must matches an org|stack that the user belongs to
GetAuthorizer() authorizer.Authorizer
}
// Builders that implement OpenAPIPostProcessor are given a chance to modify the schema directly
type OpenAPIPostProcessor interface {
PostProcessOpenAPI(*spec3.OpenAPI) (*spec3.OpenAPI, error)
}
// This is used to implement dynamic sub-resources like pods/x/logs
type APIRouteHandler struct {
Path string // added to the appropriate level
Spec *spec3.PathProps // Exposed in the open api service discovery
Handler http.HandlerFunc // when Level = resource, the resource will be available in context
}
// APIRoutes define explicit HTTP handlers in an apiserver
// TBD: is this actually necessary -- there may be more k8s native options for this
type APIRoutes struct {
// Root handlers are registered directly after the apiVersion identifier
Root []APIRouteHandler
// Namespace handlers are mounted under the namespace
Namespace []APIRouteHandler
}
type APIRegistrar interface {
RegisterAPI(builder APIGroupBuilder)
}

View File

@@ -1,145 +0,0 @@
package builder
import (
"fmt"
"net/http"
goruntime "runtime"
"runtime/debug"
"strconv"
"strings"
"time"
"golang.org/x/mod/semver"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/version"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
"k8s.io/apiserver/pkg/registry/generic"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/util/openapi"
k8sscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/kube-openapi/pkg/common"
"github.com/grafana/grafana/pkg/setting"
)
func SetupConfig(
scheme *runtime.Scheme,
serverConfig *genericapiserver.RecommendedConfig,
builders []APIGroupBuilder,
) error {
defsGetter := GetOpenAPIDefinitions(builders)
serverConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(
openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(defsGetter),
openapinamer.NewDefinitionNamer(scheme, k8sscheme.Scheme))
serverConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(
openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(defsGetter),
openapinamer.NewDefinitionNamer(scheme, k8sscheme.Scheme))
// Add the custom routes to service discovery
serverConfig.OpenAPIV3Config.PostProcessSpec = getOpenAPIPostProcessor(builders)
serverConfig.OpenAPIV3Config.GetOperationIDAndTagsFromRoute = func(r common.Route) (string, []string, error) {
tags := []string{}
prop, ok := r.Metadata()["x-kubernetes-group-version-kind"]
if ok {
gvk, ok := prop.(metav1.GroupVersionKind)
if ok && gvk.Kind != "" {
tags = append(tags, gvk.Kind)
}
}
return r.OperationName(), tags, nil
}
// Set the swagger build versions
serverConfig.OpenAPIConfig.Info.Version = setting.BuildVersion
serverConfig.OpenAPIV3Config.Info.Version = setting.BuildVersion
serverConfig.SkipOpenAPIInstallation = false
serverConfig.BuildHandlerChainFunc = func(delegateHandler http.Handler, c *genericapiserver.Config) http.Handler {
// Call DefaultBuildHandlerChain on the main entrypoint http.Handler
// See https://github.com/kubernetes/apiserver/blob/v0.28.0/pkg/server/config.go#L906
// DefaultBuildHandlerChain provides many things, notably CORS, HSTS, cache-control, authz and latency tracking
requestHandler, err := getAPIHandler(
delegateHandler,
c.LoopbackClientConfig,
builders)
if err != nil {
panic(fmt.Sprintf("could not build handler chain func: %s", err.Error()))
}
return genericapiserver.DefaultBuildHandlerChain(requestHandler, c)
}
k8sVersion, err := getK8sApiserverVersion()
if err != nil {
return err
}
before, after, _ := strings.Cut(setting.BuildVersion, ".")
serverConfig.Version = &version.Info{
Major: before,
Minor: after,
GoVersion: goruntime.Version(),
Platform: fmt.Sprintf("%s/%s", goruntime.GOOS, goruntime.GOARCH),
Compiler: goruntime.Compiler,
GitTreeState: setting.BuildBranch,
GitCommit: setting.BuildCommit,
BuildDate: time.Unix(setting.BuildStamp, 0).UTC().Format(time.DateTime),
GitVersion: k8sVersion,
}
return nil
}
func InstallAPIs(
scheme *runtime.Scheme,
codecs serializer.CodecFactory,
server *genericapiserver.GenericAPIServer,
optsGetter generic.RESTOptionsGetter,
builders []APIGroupBuilder,
dualWrite bool,
) error {
for _, b := range builders {
g, err := b.GetAPIGroupInfo(scheme, codecs, optsGetter, dualWrite)
if err != nil {
return err
}
if g == nil || len(g.PrioritizedVersions) < 1 {
continue
}
err = server.InstallAPIGroup(g)
if err != nil {
return err
}
}
return nil
}
// find the k8s version according to build info
func getK8sApiserverVersion() (string, error) {
bi, ok := debug.ReadBuildInfo()
if !ok {
return "", fmt.Errorf("debug.ReadBuildInfo() failed")
}
if len(bi.Deps) == 0 {
return "v?.?", nil // this is normal while debugging
}
for _, dep := range bi.Deps {
if dep.Path == "k8s.io/apiserver" {
if !semver.IsValid(dep.Version) {
return "", fmt.Errorf("invalid semantic version for k8s.io/apiserver")
}
// v0 => v1
majorVersion := strings.TrimPrefix(semver.Major(dep.Version), "v")
majorInt, err := strconv.Atoi(majorVersion)
if err != nil {
return "", fmt.Errorf("could not convert majorVersion to int. majorVersion: %s", majorVersion)
}
newMajor := fmt.Sprintf("v%d", majorInt+1)
return strings.Replace(dep.Version, semver.Major(dep.Version), newMajor, 1), nil
}
}
return "", fmt.Errorf("could not find k8s.io/apiserver in build info")
}

View File

@@ -1,114 +0,0 @@
package builder
import (
"maps"
"strings"
common "k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/spec3"
spec "k8s.io/kube-openapi/pkg/validation/spec"
"github.com/grafana/grafana/pkg/apis/common/v0alpha1"
"github.com/grafana/grafana/pkg/setting"
)
// This should eventually live in grafana-app-sdk
func GetOpenAPIDefinitions(builders []APIGroupBuilder) common.GetOpenAPIDefinitions {
return func(ref common.ReferenceCallback) map[string]common.OpenAPIDefinition {
defs := v0alpha1.GetOpenAPIDefinitions(ref) // common grafana apis
for _, b := range builders {
g := b.GetOpenAPIDefinitions()
if g != nil {
out := g(ref)
maps.Copy(defs, out)
}
}
return defs
}
}
// Modify the the OpenAPI spec to include the additional routes.
// Currently this requires: https://github.com/kubernetes/kube-openapi/pull/420
// In future k8s release, the hook will use Config3 rather than the same hook for both v2 and v3
func getOpenAPIPostProcessor(builders []APIGroupBuilder) func(*spec3.OpenAPI) (*spec3.OpenAPI, error) {
return func(s *spec3.OpenAPI) (*spec3.OpenAPI, error) {
if s.Paths == nil {
return s, nil
}
for _, b := range builders {
routes := b.GetAPIRoutes()
gv := b.GetGroupVersion()
prefix := "/apis/" + gv.String() + "/"
if s.Paths.Paths[prefix] != nil {
copy := spec3.OpenAPI{
Version: s.Version,
Info: &spec.Info{
InfoProps: spec.InfoProps{
Title: gv.String(),
Version: setting.BuildVersion,
},
},
Components: s.Components,
ExternalDocs: s.ExternalDocs,
Servers: s.Servers,
Paths: s.Paths,
}
if routes == nil {
routes = &APIRoutes{}
}
for _, route := range routes.Root {
copy.Paths.Paths[prefix+route.Path] = &spec3.Path{
PathProps: *route.Spec,
}
}
for _, route := range routes.Namespace {
copy.Paths.Paths[prefix+"namespaces/{namespace}/"+route.Path] = &spec3.Path{
PathProps: *route.Spec,
}
}
// Make the sub-resources (connect) share the same tags as the main resource
for path, spec := range copy.Paths.Paths {
idx := strings.LastIndex(path, "{name}/")
if idx > 0 {
parent := copy.Paths.Paths[path[:idx+6]]
if parent != nil && parent.Get != nil {
for _, op := range GetPathOperations(spec) {
if op != nil && op.Extensions != nil {
action, ok := op.Extensions.GetString("x-kubernetes-action")
if ok && action == "connect" {
op.Tags = parent.Get.Tags
}
}
}
}
}
}
// Support direct manipulation of API results
processor, ok := b.(OpenAPIPostProcessor)
if ok {
return processor.PostProcessOpenAPI(&copy)
}
return &copy, nil
}
}
return s, nil
}
}
func GetPathOperations(path *spec3.Path) []*spec3.Operation {
return []*spec3.Operation{
path.Get,
path.Head,
path.Delete,
path.Patch,
path.Post,
path.Put,
path.Trace,
path.Options,
}
}

View File

@@ -1,115 +0,0 @@
package builder
import (
"fmt"
"net/http"
"github.com/gorilla/mux"
restclient "k8s.io/client-go/rest"
"k8s.io/kube-openapi/pkg/spec3"
)
type requestHandler struct {
router *mux.Router
}
func getAPIHandler(delegateHandler http.Handler, restConfig *restclient.Config, builders []APIGroupBuilder) (http.Handler, error) {
useful := false // only true if any routes exist anywhere
router := mux.NewRouter()
for _, builder := range builders {
routes := builder.GetAPIRoutes()
if routes == nil {
continue
}
gv := builder.GetGroupVersion()
prefix := "/apis/" + gv.String()
// Root handlers
var sub *mux.Router
for _, route := range routes.Root {
if sub == nil {
sub = router.PathPrefix(prefix).Subrouter()
sub.MethodNotAllowedHandler = &methodNotAllowedHandler{}
}
useful = true
methods, err := methodsFromSpec(route.Path, route.Spec)
if err != nil {
return nil, err
}
sub.HandleFunc("/"+route.Path, route.Handler).
Methods(methods...)
}
// Namespace handlers
sub = nil
prefix += "/namespaces/{namespace}"
for _, route := range routes.Namespace {
if sub == nil {
sub = router.PathPrefix(prefix).Subrouter()
sub.MethodNotAllowedHandler = &methodNotAllowedHandler{}
}
useful = true
methods, err := methodsFromSpec(route.Path, route.Spec)
if err != nil {
return nil, err
}
sub.HandleFunc("/"+route.Path, route.Handler).
Methods(methods...)
}
}
if !useful {
return delegateHandler, nil
}
// Per Gorilla Mux issue here: https://github.com/gorilla/mux/issues/616#issuecomment-798807509
// default handler must come last
router.PathPrefix("/").Handler(delegateHandler)
return &requestHandler{
router: router,
}, nil
}
func (h *requestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
h.router.ServeHTTP(w, req)
}
func methodsFromSpec(slug string, props *spec3.PathProps) ([]string, error) {
if props == nil {
return []string{"GET", "POST", "PUT", "PATCH", "DELETE"}, nil
}
methods := make([]string, 0)
if props.Get != nil {
methods = append(methods, "GET")
}
if props.Post != nil {
methods = append(methods, "POST")
}
if props.Put != nil {
methods = append(methods, "PUT")
}
if props.Patch != nil {
methods = append(methods, "PATCH")
}
if props.Delete != nil {
methods = append(methods, "DELETE")
}
if len(methods) == 0 {
return nil, fmt.Errorf("invalid OpenAPI Spec for slug=%s without any methods in PathProps", slug)
}
return methods, nil
}
type methodNotAllowedHandler struct{}
func (h *methodNotAllowedHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(405) // method not allowed
}

View File

@@ -1,132 +0,0 @@
package responsewriter
import (
"bufio"
"fmt"
"io"
"net/http"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
"k8s.io/klog/v2"
)
var _ responsewriter.CloseNotifierFlusher = (*ResponseAdapter)(nil)
var _ http.ResponseWriter = (*ResponseAdapter)(nil)
var _ io.ReadCloser = (*ResponseAdapter)(nil)
func WrapHandler(handler http.Handler) func(req *http.Request) (*http.Response, error) {
// ignore the lint error because the response is passed directly to the client,
// so the client will be responsible for closing the response body.
//nolint:bodyclose
return func(req *http.Request) (*http.Response, error) {
w := NewAdapter(req)
resp := w.Response()
go func() {
handler.ServeHTTP(w, req)
if err := w.CloseWriter(); err != nil {
klog.Errorf("error closing writer: %v", err)
}
}()
return resp, nil
}
}
// ResponseAdapter is an implementation of [http.ResponseWriter] that allows conversion to a [http.Response].
type ResponseAdapter struct {
req *http.Request
res *http.Response
reader io.ReadCloser
writer io.WriteCloser
buffered *bufio.ReadWriter
}
// NewAdapter returns an initialized [ResponseAdapter].
func NewAdapter(req *http.Request) *ResponseAdapter {
r, w := io.Pipe()
writer := bufio.NewWriter(w)
reader := bufio.NewReader(r)
buffered := bufio.NewReadWriter(reader, writer)
return &ResponseAdapter{
req: req,
res: &http.Response{
Proto: req.Proto,
ProtoMajor: req.ProtoMajor,
ProtoMinor: req.ProtoMinor,
Header: make(http.Header),
},
reader: r,
writer: w,
buffered: buffered,
}
}
// Header implements [http.ResponseWriter].
// It returns the response headers to mutate within a handler.
func (ra *ResponseAdapter) Header() http.Header {
return ra.res.Header
}
// Write implements [http.ResponseWriter].
func (ra *ResponseAdapter) Write(buf []byte) (int, error) {
return ra.buffered.Write(buf)
}
// Read implements [io.Reader].
func (ra *ResponseAdapter) Read(buf []byte) (int, error) {
return ra.buffered.Read(buf)
}
// WriteHeader implements [http.ResponseWriter].
func (ra *ResponseAdapter) WriteHeader(code int) {
ra.res.StatusCode = code
ra.res.Status = fmt.Sprintf("%03d %s", code, http.StatusText(code))
}
// Flush implements [http.Flusher].
func (ra *ResponseAdapter) Flush() {
if ra.buffered.Writer.Buffered() == 0 {
return
}
if err := ra.buffered.Writer.Flush(); err != nil {
klog.Error("Error flushing response buffer: ", "error", err)
}
}
// Response returns the [http.Response] generated by the [http.Handler].
func (ra *ResponseAdapter) Response() *http.Response {
// make sure to set the status code to 200 if the request is a watch
// this is to ensure that client-go uses a streamwatcher:
// https://github.com/kubernetes/client-go/blob/76174b8af8cfd938018b04198595d65b48a69334/rest/request.go#L737
if ra.res.StatusCode == 0 && ra.req.URL.Query().Get("watch") == "true" {
ra.WriteHeader(http.StatusOK)
}
ra.res.Body = ra
return ra.res
}
// Decorate implements [responsewriter.UserProvidedDecorator].
func (ra *ResponseAdapter) Unwrap() http.ResponseWriter {
return ra
}
// CloseNotify implements [http.CloseNotifier].
func (ra *ResponseAdapter) CloseNotify() <-chan bool {
ch := make(chan bool)
go func() {
<-ra.req.Context().Done()
ch <- true
}()
return ch
}
// Close implements [io.Closer].
func (ra *ResponseAdapter) Close() error {
return ra.reader.Close()
}
// CloseWriter should be called after the http.Handler has returned.
func (ra *ResponseAdapter) CloseWriter() error {
ra.Flush()
return ra.writer.Close()
}

View File

@@ -1,136 +0,0 @@
package responsewriter_test
import (
"io"
"math/rand"
"net/http"
"testing"
"time"
grafanaresponsewriter "github.com/grafana/grafana/pkg/services/apiserver/endpoints/responsewriter"
"github.com/stretchr/testify/require"
)
func TestResponseAdapter(t *testing.T) {
t.Run("should handle synchronous write", func(t *testing.T) {
client := &http.Client{
Transport: &roundTripperFunc{
ready: make(chan struct{}),
// ignore the lint error because the response is passed directly to the client,
// so the client will be responsible for closing the response body.
//nolint:bodyclose
fn: grafanaresponsewriter.WrapHandler(http.HandlerFunc(syncHandler)),
},
}
close(client.Transport.(*roundTripperFunc).ready)
req, err := http.NewRequest("GET", "http://localhost/test", nil)
require.NoError(t, err)
resp, err := client.Do(req)
require.NoError(t, err)
defer func() {
err := resp.Body.Close()
require.NoError(t, err)
}()
bodyBytes, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, "OK", string(bodyBytes))
})
t.Run("should handle synchronous write", func(t *testing.T) {
generateRandomStrings(10)
client := &http.Client{
Transport: &roundTripperFunc{
ready: make(chan struct{}),
// ignore the lint error because the response is passed directly to the client,
// so the client will be responsible for closing the response body.
//nolint:bodyclose
fn: grafanaresponsewriter.WrapHandler(http.HandlerFunc(asyncHandler)),
},
}
close(client.Transport.(*roundTripperFunc).ready)
req, err := http.NewRequest("GET", "http://localhost/test?watch=true", nil)
require.NoError(t, err)
resp, err := client.Do(req)
require.NoError(t, err)
defer func() {
err := resp.Body.Close()
require.NoError(t, err)
}()
// ensure that watch request is a 200
require.Equal(t, http.StatusOK, resp.StatusCode)
// limit to 100 bytes to test the reader buffer
buf := make([]byte, 100)
// holds the read bytes between iterations
cache := []byte{}
for i := 0; i < 10; {
n, err := resp.Body.Read(buf)
require.NoError(t, err)
if n == 0 {
continue
}
cache = append(cache, buf[:n]...)
if len(cache) >= len(randomStrings[i]) {
str := cache[:len(randomStrings[i])]
require.Equal(t, randomStrings[i], string(str))
cache = cache[len(randomStrings[i]):]
i++
}
}
})
}
func syncHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
}
func asyncHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
for _, s := range randomStrings {
time.Sleep(100 * time.Millisecond)
// write the current iteration
_, _ = w.Write([]byte(s))
w.(http.Flusher).Flush()
}
}
var randomStrings = []string{}
func generateRandomStrings(n int) {
for i := 0; i < n; i++ {
randomString := generateRandomString(1000 * (i + 1))
randomStrings = append(randomStrings, randomString)
}
}
func generateRandomString(n int) string {
gen := rand.New(rand.NewSource(time.Now().UnixNano()))
var chars = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
b := make([]rune, n)
for i := range b {
b[i] = chars[gen.Intn(len(chars))]
}
return string(b)
}
type roundTripperFunc struct {
ready chan struct{}
fn func(req *http.Request) (*http.Response, error)
}
func (f *roundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {
if f.fn == nil {
<-f.ready
}
res, err := f.fn(req)
return res, err
}

View File

@@ -18,7 +18,7 @@ import (
"k8s.io/kube-openapi/pkg/common"
servicev0alpha1 "github.com/grafana/grafana/pkg/apis/service/v0alpha1"
filestorage "github.com/grafana/grafana/pkg/services/apiserver/storage/file"
filestorage "github.com/grafana/grafana/pkg/apiserver/storage/file"
)
// AggregatorServerOptions contains the state for the aggregator apiserver

View File

@@ -1,79 +0,0 @@
package generic
import (
"context"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/names"
)
type genericStrategy struct {
runtime.ObjectTyper
names.NameGenerator
}
// NewStrategy creates and returns a genericStrategy instance.
func NewStrategy(typer runtime.ObjectTyper) genericStrategy {
return genericStrategy{typer, names.SimpleNameGenerator}
}
// NamespaceScoped returns true because all Generic resources must be within a namespace.
func (genericStrategy) NamespaceScoped() bool {
return true
}
func (genericStrategy) PrepareForCreate(ctx context.Context, obj runtime.Object) {}
func (genericStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object) {}
func (genericStrategy) Validate(ctx context.Context, obj runtime.Object) field.ErrorList {
return field.ErrorList{}
}
// WarningsOnCreate returns warnings for the creation of the given object.
func (genericStrategy) WarningsOnCreate(ctx context.Context, obj runtime.Object) []string { return nil }
func (genericStrategy) AllowCreateOnUpdate() bool {
return true
}
func (genericStrategy) AllowUnconditionalUpdate() bool {
return true
}
func (genericStrategy) Canonicalize(obj runtime.Object) {}
func (genericStrategy) ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList {
return field.ErrorList{}
}
// WarningsOnUpdate returns warnings for the given update.
func (genericStrategy) WarningsOnUpdate(ctx context.Context, obj, old runtime.Object) []string {
return nil
}
// GetAttrs returns labels and fields of an object.
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return nil, nil, err
}
fieldsSet := fields.Set{
"metadata.name": accessor.GetName(),
}
return labels.Set(accessor.GetLabels()), fieldsSet, nil
}
// Matcher returns a generic.SelectionPredicate that matches on label and field selectors.
func Matcher(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
return storage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

View File

@@ -1,195 +0,0 @@
package rest
import (
"context"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/rest"
"github.com/grafana/grafana/pkg/infra/log"
)
var (
_ rest.Storage = (*DualWriter)(nil)
_ rest.Scoper = (*DualWriter)(nil)
_ rest.TableConvertor = (*DualWriter)(nil)
_ rest.CreaterUpdater = (*DualWriter)(nil)
_ rest.CollectionDeleter = (*DualWriter)(nil)
_ rest.GracefulDeleter = (*DualWriter)(nil)
_ rest.SingularNameProvider = (*DualWriter)(nil)
)
// Storage is a storage implementation that satisfies the same interfaces as genericregistry.Store.
type Storage interface {
rest.Storage
rest.StandardStorage
rest.Scoper
rest.TableConvertor
rest.SingularNameProvider
rest.Getter
}
// LegacyStorage is a storage implementation that writes to the Grafana SQL database.
type LegacyStorage interface {
rest.Storage
rest.Scoper
rest.SingularNameProvider
rest.TableConvertor
rest.Getter
}
// DualWriter is a storage implementation that writes first to LegacyStorage and then to Storage.
// If writing to LegacyStorage fails, the write to Storage is skipped and the error is returned.
// Storage is used for all read operations. This is useful as a migration step from SQL based
// legacy storage to a more standard kubernetes backed storage interface.
//
// NOTE: Only values supported by legacy storage will be preserved in the CREATE/UPDATE commands.
// For example, annotations, labels, and managed fields may not be preserved. Everything in upstream
// storage can be recrated from the data in legacy storage.
//
// The LegacyStorage implementation must implement the following interfaces:
// - rest.Storage
// - rest.TableConvertor
// - rest.Scoper
// - rest.SingularNameProvider
//
// These interfaces are optional, but they all should be implemented to fully support dual writes:
// - rest.Creater
// - rest.Updater
// - rest.GracefulDeleter
// - rest.CollectionDeleter
type DualWriter struct {
Storage
legacy LegacyStorage
log log.Logger
}
// NewDualWriter returns a new DualWriter.
func NewDualWriter(legacy LegacyStorage, storage Storage) *DualWriter {
return &DualWriter{
Storage: storage,
legacy: legacy,
log: log.New("grafana-apiserver.dualwriter"),
}
}
// Create overrides the default behavior of the Storage and writes to both the LegacyStorage and Storage.
func (d *DualWriter) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
if legacy, ok := d.legacy.(rest.Creater); ok {
created, err := legacy.Create(ctx, obj, createValidation, options)
if err != nil {
return nil, err
}
accessor, err := meta.Accessor(created)
if err != nil {
return created, err
}
accessor.SetResourceVersion("")
accessor.SetUID("")
rsp, err := d.Storage.Create(ctx, created, createValidation, options)
if err != nil {
d.log.Error("unable to create object in duplicate storage", "error", err)
}
return rsp, err
}
return d.Storage.Create(ctx, obj, createValidation, options)
}
// Update overrides the default behavior of the Storage and writes to both the LegacyStorage and Storage.
func (d *DualWriter) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
if legacy, ok := d.legacy.(rest.Updater); ok {
// Get the previous version from k8s storage (the one)
old, err := d.Get(ctx, name, &metav1.GetOptions{})
if err != nil {
return nil, false, err
}
accessor, err := meta.Accessor(old)
if err != nil {
return nil, false, err
}
// Hold on to the RV+UID for the dual write
theRV := accessor.GetResourceVersion()
theUID := accessor.GetUID()
// Changes applied within new storage
// will fail if RV is out of sync
updated, err := objInfo.UpdatedObject(ctx, old)
if err != nil {
return nil, false, err
}
accessor, err = meta.Accessor(updated)
if err != nil {
return nil, false, err
}
accessor.SetUID("") // clear it
accessor.SetResourceVersion("") // remove it so it is not a constraint
obj, created, err := legacy.Update(ctx, name, &updateWrapper{
upstream: objInfo,
updated: updated, // returned as the object that will be updated
}, createValidation, updateValidation, forceAllowCreate, options)
if err != nil {
return obj, created, err
}
accessor, err = meta.Accessor(obj)
if err != nil {
return nil, false, err
}
accessor.SetResourceVersion(theRV) // the original RV
accessor.SetUID(theUID)
objInfo = &updateWrapper{
upstream: objInfo,
updated: obj, // returned as the object that will be updated
}
}
return d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
}
// Delete overrides the default behavior of the Storage and delete from both the LegacyStorage and Storage.
func (d *DualWriter) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
// Delete from storage *first* so the item is still exists if a failure happens
obj, async, err := d.Storage.Delete(ctx, name, deleteValidation, options)
if err == nil {
if legacy, ok := d.legacy.(rest.GracefulDeleter); ok {
obj, async, err = legacy.Delete(ctx, name, deleteValidation, options)
}
}
return obj, async, err
}
// DeleteCollection overrides the default behavior of the Storage and delete from both the LegacyStorage and Storage.
func (d *DualWriter) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {
out, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
if err == nil {
if legacy, ok := d.legacy.(rest.CollectionDeleter); ok {
out, err = legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
}
}
return out, err
}
type updateWrapper struct {
upstream rest.UpdatedObjectInfo
updated runtime.Object
}
// Returns preconditions built from the updated object, if applicable.
// May return nil, or a preconditions object containing nil fields,
// if no preconditions can be determined from the updated object.
func (u *updateWrapper) Preconditions() *metav1.Preconditions {
return u.upstream.Preconditions()
}
// UpdatedObject returns the updated object, given a context and old object.
// The only time an empty oldObj should be passed in is if a "create on update" is occurring (there is no oldObj).
func (u *updateWrapper) UpdatedObject(ctx context.Context, oldObj runtime.Object) (newObj runtime.Object, err error) {
return u.updated, nil
}

View File

@@ -19,6 +19,9 @@ import (
"k8s.io/client-go/tools/clientcmd"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/apiserver/builder"
grafanaresponsewriter "github.com/grafana/grafana/pkg/apiserver/endpoints/responsewriter"
filestorage "github.com/grafana/grafana/pkg/apiserver/storage/file"
"github.com/grafana/grafana/pkg/infra/appcontext"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/tracing"
@@ -28,11 +31,8 @@ import (
"github.com/grafana/grafana/pkg/services/apiserver/aggregator"
"github.com/grafana/grafana/pkg/services/apiserver/auth/authenticator"
"github.com/grafana/grafana/pkg/services/apiserver/auth/authorizer"
"github.com/grafana/grafana/pkg/services/apiserver/builder"
grafanaresponsewriter "github.com/grafana/grafana/pkg/services/apiserver/endpoints/responsewriter"
grafanaapiserveroptions "github.com/grafana/grafana/pkg/services/apiserver/options"
entitystorage "github.com/grafana/grafana/pkg/services/apiserver/storage/entity"
filestorage "github.com/grafana/grafana/pkg/services/apiserver/storage/file"
"github.com/grafana/grafana/pkg/services/apiserver/utils"
contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model"
"github.com/grafana/grafana/pkg/services/featuremgmt"
@@ -280,7 +280,15 @@ func (s *service) start(ctx context.Context) error {
}
// Add OpenAPI specs for each group+version
err := builder.SetupConfig(Scheme, serverConfig, builders)
err := builder.SetupConfig(
Scheme,
serverConfig,
builders,
s.cfg.BuildStamp,
s.cfg.BuildVersion,
s.cfg.BuildCommit,
s.cfg.BuildBranch,
)
if err != nil {
return err
}

View File

@@ -9,6 +9,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/grafana/grafana/pkg/apis/datasource/v0alpha1"
"github.com/grafana/grafana/pkg/apiserver/builder"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/registry/apis/datasource"
"github.com/grafana/grafana/pkg/registry/apis/example"
@@ -16,7 +17,6 @@ import (
"github.com/grafana/grafana/pkg/registry/apis/query"
"github.com/grafana/grafana/pkg/registry/apis/query/runner"
"github.com/grafana/grafana/pkg/services/accesscontrol/actest"
"github.com/grafana/grafana/pkg/services/apiserver/builder"
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
"github.com/grafana/grafana/pkg/services/apiserver/options"
"github.com/grafana/grafana/pkg/services/featuremgmt"

View File

@@ -1,542 +0,0 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/kubernetes-sigs/apiserver-runtime/blob/main/pkg/experimental/storage/filepath/jsonfile_rest.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Kubernetes Authors.
package file
import (
"context"
"errors"
"fmt"
"path/filepath"
"reflect"
"strings"
"sync"
"time"
"github.com/bwmarrin/snowflake"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
"k8s.io/client-go/tools/cache"
)
const MaxUpdateAttempts = 30
var _ storage.Interface = (*Storage)(nil)
// Replace with: https://github.com/kubernetes/kubernetes/blob/v1.29.0-alpha.3/staging/src/k8s.io/apiserver/pkg/storage/errors.go#L28
// When we upgrade to 1.29
var errResourceVersionSetOnCreate = errors.New("resourceVersion should not be set on objects to be created")
// Storage implements storage.Interface and storage resources as JSON files on disk.
type Storage struct {
root string
resourcePrefix string
gr schema.GroupResource
codec runtime.Codec
keyFunc func(obj runtime.Object) (string, error)
newFunc func() runtime.Object
newListFunc func() runtime.Object
getAttrsFunc storage.AttrFunc
trigger storage.IndexerFuncs
indexers *cache.Indexers
watchSet *WatchSet
}
// ErrFileNotExists means the file doesn't actually exist.
var ErrFileNotExists = fmt.Errorf("file doesn't exist")
// ErrNamespaceNotExists means the directory for the namespace doesn't actually exist.
var ErrNamespaceNotExists = errors.New("namespace does not exist")
var (
node *snowflake.Node
once sync.Once
)
func getResourceVersion() (*uint64, error) {
var err error
once.Do(func() {
node, err = snowflake.NewNode(1)
})
if err != nil {
return nil, err
}
snowflakeNumber := node.Generate().Int64()
resourceVersion := uint64(snowflakeNumber)
return &resourceVersion, nil
}
// NewStorage instantiates a new Storage.
func NewStorage(
config *storagebackend.ConfigForResource,
resourcePrefix string,
keyFunc func(obj runtime.Object) (string, error),
newFunc func() runtime.Object,
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
trigger storage.IndexerFuncs,
indexers *cache.Indexers,
) (storage.Interface, factory.DestroyFunc, error) {
root := config.Prefix
if err := ensureDir(root); err != nil {
return nil, func() {}, fmt.Errorf("could not establish a writable directory at path=%s", root)
}
ws := NewWatchSet()
return &Storage{
root: root,
resourcePrefix: resourcePrefix,
gr: config.GroupResource,
codec: config.Codec,
keyFunc: keyFunc,
newFunc: newFunc,
newListFunc: newListFunc,
getAttrsFunc: getAttrsFunc,
trigger: trigger,
indexers: indexers,
watchSet: ws,
}, func() {
ws.cleanupWatchers()
}, nil
}
// Returns Versioner associated with this storage.
func (s *Storage) Versioner() storage.Versioner {
return &storage.APIObjectVersioner{}
}
// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
// set to the read value from database.
func (s *Storage) Create(ctx context.Context, key string, obj runtime.Object, out runtime.Object, ttl uint64) error {
filename := s.filePath(key)
if exists(filename) {
return storage.NewKeyExistsError(key, 0)
}
dirname := filepath.Dir(filename)
if err := ensureDir(dirname); err != nil {
return err
}
generatedRV, err := getResourceVersion()
if err != nil {
return err
}
metaObj, err := meta.Accessor(obj)
if err != nil {
return err
}
metaObj.SetSelfLink("")
if metaObj.GetResourceVersion() != "" {
return errResourceVersionSetOnCreate
}
if err := s.Versioner().UpdateObject(obj, *generatedRV); err != nil {
return err
}
if err := writeFile(s.codec, filename, obj); err != nil {
return err
}
// set a timer to delete the file after ttl seconds
if ttl > 0 {
time.AfterFunc(time.Second*time.Duration(ttl), func() {
if err := s.Delete(ctx, key, s.newFunc(), &storage.Preconditions{}, func(ctx context.Context, obj runtime.Object) error { return nil }, obj); err != nil {
panic(err)
}
})
}
if err := s.Get(ctx, key, storage.GetOptions{}, out); err != nil {
return err
}
s.watchSet.notifyWatchers(watch.Event{
Object: out.DeepCopyObject(),
Type: watch.Added,
})
return nil
}
// Delete removes the specified key and returns the value that existed at that spot.
// If key didn't exist, it will return NotFound storage error.
// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the
// current version of the object to avoid read operation from storage to get it.
// However, the implementations have to retry in case suggestion is stale.
func (s *Storage) Delete(
ctx context.Context,
key string,
out runtime.Object,
preconditions *storage.Preconditions,
validateDeletion storage.ValidateObjectFunc,
cachedExistingObject runtime.Object,
) error {
filename := s.filePath(key)
var currentState runtime.Object
var stateIsCurrent bool
if cachedExistingObject != nil {
currentState = cachedExistingObject
} else {
getOptions := storage.GetOptions{}
if preconditions != nil && preconditions.ResourceVersion != nil {
getOptions.ResourceVersion = *preconditions.ResourceVersion
}
if err := s.Get(ctx, key, getOptions, currentState); err == nil {
stateIsCurrent = true
}
}
for {
if preconditions != nil {
if err := preconditions.Check(key, out); err != nil {
if stateIsCurrent {
return err
}
// If the state is not current, we need to re-read the state and try again.
if err := s.Get(ctx, key, storage.GetOptions{}, currentState); err != nil {
return err
}
stateIsCurrent = true
continue
}
}
if err := validateDeletion(ctx, out); err != nil {
if stateIsCurrent {
return err
}
// If the state is not current, we need to re-read the state and try again.
if err := s.Get(ctx, key, storage.GetOptions{}, currentState); err == nil {
stateIsCurrent = true
}
continue
}
if err := s.Get(ctx, key, storage.GetOptions{}, out); err != nil {
return err
}
generatedRV, err := getResourceVersion()
if err != nil {
return err
}
if err := s.Versioner().UpdateObject(out, *generatedRV); err != nil {
return err
}
if err := deleteFile(filename); err != nil {
return err
}
s.watchSet.notifyWatchers(watch.Event{
Object: out.DeepCopyObject(),
Type: watch.Deleted,
})
return nil
}
}
// Watch begins watching the specified key. Events are decoded into API objects,
// and any items selected by 'p' are sent down to returned watch.Interface.
// resourceVersion may be used to specify what version to begin watching,
// which should be the current resourceVersion, and no longer rv+1
// (e.g. reconnecting without missing any updates).
// If resource version is "0", this interface will get current object at given key
// and send it in an "ADDED" event, before watch starts.
func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
p := opts.Predicate
jw := s.watchSet.newWatch()
listObj := s.newListFunc()
if opts.ResourceVersion == "0" {
err := s.GetList(ctx, key, opts, listObj)
if err != nil {
return nil, err
}
}
initEvents := make([]watch.Event, 0)
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return nil, err
}
v, err := conversion.EnforcePtr(listPtr)
if err != nil {
return nil, err
}
if v.IsNil() {
jw.Start(p, initEvents)
return jw, nil
}
for _, obj := range v.Elem().Interface().([]runtime.Object) {
initEvents = append(initEvents, watch.Event{
Type: watch.Added,
Object: obj.DeepCopyObject(),
})
}
jw.Start(p, initEvents)
return jw, nil
}
// Get unmarshals object found at key into objPtr. On a not found error, will either
// return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'.
// Treats empty responses and nil response nodes exactly like a not found error.
// The returned contents may be delayed, but it is guaranteed that they will
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
func (s *Storage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
filename := s.filePath(key)
obj, err := readFile(s.codec, filename, func() runtime.Object {
return objPtr
})
if err != nil {
if opts.IgnoreNotFound {
return runtime.SetZeroValue(objPtr)
}
rv, err := s.Versioner().ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return err
}
return storage.NewKeyNotFoundError(key, int64(rv))
}
currentVersion, err := s.Versioner().ObjectResourceVersion(obj)
if err != nil {
return err
}
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, currentVersion); err != nil {
return err
}
return nil
}
// GetList unmarshalls objects found at key into a *List api object (an object
// that satisfies runtime.IsList definition).
// If 'opts.Recursive' is false, 'key' is used as an exact match. If `opts.Recursive'
// is true, 'key' is used as a prefix.
// The returned contents may be delayed, but it is guaranteed that they will
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
func (s *Storage) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
generatedRV, err := getResourceVersion()
if err != nil {
return err
}
remainingItems := int64(0)
if err := s.Versioner().UpdateList(listObj, *generatedRV, "", &remainingItems); err != nil {
return err
}
// TODO: hack the resource version for now
// Watch is failing when set the list resourceVersion to 0, even though informers provide that in the opts
if opts.ResourceVersion == "0" {
opts.ResourceVersion = "1"
}
if opts.ResourceVersion != "" {
resourceVersionInt, err := s.Versioner().ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return err
}
if err := s.Versioner().UpdateList(listObj, resourceVersionInt, "", &remainingItems); err != nil {
return err
}
}
dirname := s.dirPath(key)
objs, err := readDirRecursive(s.codec, dirname, s.newFunc)
if err != nil {
return err
}
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
v, err := conversion.EnforcePtr(listPtr)
if err != nil {
return err
}
for _, obj := range objs {
currentVersion, err := s.Versioner().ObjectResourceVersion(obj)
if err != nil {
return err
}
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, currentVersion); err != nil {
continue
}
ok, err := opts.Predicate.Matches(obj)
if err == nil && ok {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
}
return nil
}
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'destination')
// retrying the update until success if there is index conflict.
// Note that object passed to tryUpdate may change across invocations of tryUpdate() if
// other writers are simultaneously updating it, so tryUpdate() needs to take into account
// the current contents of the object when deciding how the update object should look.
// If the key doesn't exist, it will return NotFound storage error if ignoreNotFound=false
// else `destination` will be set to the zero value of it's type.
// If the eventual successful invocation of `tryUpdate` returns an output with the same serialized
// contents as the input, it won't perform any update, but instead set `destination` to an object with those
// contents.
// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the
// current version of the object to avoid read operation from storage to get it.
// However, the implementations have to retry in case suggestion is stale.
func (s *Storage) GuaranteedUpdate(
ctx context.Context,
key string,
destination runtime.Object,
ignoreNotFound bool,
preconditions *storage.Preconditions,
tryUpdate storage.UpdateFunc,
cachedExistingObject runtime.Object,
) error {
var res storage.ResponseMeta
for attempt := 1; attempt <= MaxUpdateAttempts; attempt = attempt + 1 {
var (
filename = s.filePath(key)
obj runtime.Object
err error
created bool
)
if !exists(filename) && !ignoreNotFound {
return apierrors.NewNotFound(s.gr, s.nameFromKey(key))
}
obj, err = readFile(s.codec, filename, s.newFunc)
if err != nil {
// fallback to new object if the file is not found
obj = s.newFunc()
created = true
}
if err := preconditions.Check(key, obj); err != nil {
if attempt >= MaxUpdateAttempts {
return fmt.Errorf("precondition failed: %w", err)
}
continue
}
updatedObj, _, err := tryUpdate(obj, res)
if err != nil {
if attempt >= MaxUpdateAttempts {
return err
}
continue
}
unchanged, err := isUnchanged(s.codec, obj, updatedObj)
if err != nil {
return err
}
if unchanged {
u, err := conversion.EnforcePtr(updatedObj)
if err != nil {
return fmt.Errorf("unable to enforce updated object pointer: %w", err)
}
d, err := conversion.EnforcePtr(destination)
if err != nil {
return fmt.Errorf("unable to enforce destination pointer: %w", err)
}
d.Set(u)
return nil
}
generatedRV, err := getResourceVersion()
if err != nil {
return err
}
if err := s.Versioner().UpdateObject(updatedObj, *generatedRV); err != nil {
return err
}
if err := writeFile(s.codec, filename, updatedObj); err != nil {
return err
}
eventType := watch.Modified
if created {
eventType = watch.Added
}
s.watchSet.notifyWatchers(watch.Event{
Object: updatedObj.DeepCopyObject(),
Type: eventType,
})
}
return nil
}
// Count returns number of different entries under the key (generally being path prefix).
func (s *Storage) Count(key string) (int64, error) {
return 0, nil
}
// RequestWatchProgress requests the a watch stream progress status be sent in the
// watch response stream as soon as possible.
// Used for monitor watch progress even if watching resources with no changes.
//
// If watch is lagging, progress status might:
// * be pointing to stale resource version. Use etcd KV request to get linearizable resource version.
// * not be delivered at all. It's recommended to poll request progress periodically.
//
// Note: Only watches with matching context grpc metadata will be notified.
// https://github.com/kubernetes/kubernetes/blob/9325a57125e8502941d1b0c7379c4bb80a678d5c/vendor/go.etcd.io/etcd/client/v3/watch.go#L1037-L1042
//
// TODO: Remove when storage.Interface will be separate from etc3.store.
// Deprecated: Added temporarily to simplify exposing RequestProgress for watch cache.
func (s *Storage) RequestWatchProgress(ctx context.Context) error {
return nil
}
// validateMinimumResourceVersion returns a 'too large resource' version error when the provided minimumResourceVersion is
// greater than the most recent actualRevision available from storage.
func (s *Storage) validateMinimumResourceVersion(minimumResourceVersion string, actualRevision uint64) error {
if minimumResourceVersion == "" {
return nil
}
minimumRV, err := s.Versioner().ParseResourceVersion(minimumResourceVersion)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
// Enforce the storage.Interface guarantee that the resource version of the returned data
// "will be at least 'resourceVersion'".
if minimumRV > actualRevision {
return storage.NewTooLargeResourceVersionError(minimumRV, actualRevision, 0)
}
return nil
}
func (s *Storage) nameFromKey(key string) string {
return strings.Replace(key, s.resourcePrefix+"/", "", 1)
}

View File

@@ -1,62 +0,0 @@
// SPDX-License-Identifier: AGPL-3.0-only
package file
import (
"os"
"path/filepath"
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/storage/storagebackend"
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
)
var _ generic.RESTOptionsGetter = (*RESTOptionsGetter)(nil)
type RESTOptionsGetter struct {
path string
original storagebackend.Config
}
func NewRESTOptionsGetter(path string, originalStorageConfig storagebackend.Config) *RESTOptionsGetter {
if path == "" {
path = filepath.Join(os.TempDir(), "grafana-apiserver")
}
return &RESTOptionsGetter{path: path, original: originalStorageConfig}
}
func (r *RESTOptionsGetter) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
storageConfig := &storagebackend.ConfigForResource{
Config: storagebackend.Config{
Type: "file",
Prefix: r.path,
Transport: storagebackend.TransportConfig{},
Codec: r.original.Codec,
EncodeVersioner: r.original.EncodeVersioner,
Transformer: r.original.Transformer,
CompactionInterval: 0,
CountMetricPollPeriod: 0,
DBMetricPollInterval: 0,
HealthcheckTimeout: 0,
ReadycheckTimeout: 0,
StorageObjectCountTracker: flowcontrolrequest.NewStorageObjectCountTracker(),
},
GroupResource: resource,
}
ret := generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: NewStorage,
DeleteCollectionWorkers: 0,
EnableGarbageCollection: false,
// k8s expects forward slashes here, we'll convert them to os path separators in the storage
ResourcePrefix: "/" + resource.Group + "/" + resource.Resource,
CountMetricPollPeriod: 1 * time.Second,
StorageObjectCountTracker: storageConfig.Config.StorageObjectCountTracker,
}
return ret, nil
}

View File

@@ -1,106 +0,0 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/kubernetes-sigs/apiserver-runtime/blob/main/pkg/experimental/storage/filepath/jsonfile_rest.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Kubernetes Authors.
package file
import (
"bytes"
"errors"
"os"
"path/filepath"
"strings"
"k8s.io/apimachinery/pkg/runtime"
)
func (s *Storage) filePath(key string) string {
// Replace backslashes with underscores to avoid creating bogus subdirectories
key = strings.Replace(key, "\\", "_", -1)
fileName := filepath.Join(s.root, filepath.Clean(key+".json"))
return fileName
}
func (s *Storage) dirPath(key string) string {
// Replace backslashes with underscores to avoid creating bogus subdirectories
key = strings.Replace(key, "\\", "_", -1)
dirName := filepath.Join(s.root, filepath.Clean(key))
return dirName
}
func writeFile(codec runtime.Codec, path string, obj runtime.Object) error {
buf := new(bytes.Buffer)
if err := codec.Encode(obj, buf); err != nil {
return err
}
return os.WriteFile(path, buf.Bytes(), 0600)
}
func readFile(codec runtime.Codec, path string, newFunc func() runtime.Object) (runtime.Object, error) {
content, err := os.ReadFile(filepath.Clean(path))
if err != nil {
return nil, err
}
newObj := newFunc()
decodedObj, _, err := codec.Decode(content, nil, newObj)
if err != nil {
return nil, err
}
return decodedObj, nil
}
func readDirRecursive(codec runtime.Codec, path string, newFunc func() runtime.Object) ([]runtime.Object, error) {
var objs []runtime.Object
err := filepath.Walk(path, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() || filepath.Ext(path) != ".json" {
return nil
}
obj, err := readFile(codec, path, newFunc)
if err != nil {
return err
}
objs = append(objs, obj)
return nil
})
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return objs, nil
}
return nil, err
}
return objs, nil
}
func deleteFile(path string) error {
return os.Remove(path)
}
func exists(filepath string) bool {
_, err := os.Stat(filepath)
return err == nil
}
func ensureDir(dirname string) error {
if !exists(dirname) {
return os.MkdirAll(dirname, 0700)
}
return nil
}
func isUnchanged(codec runtime.Codec, obj runtime.Object, newObj runtime.Object) (bool, error) {
buf := new(bytes.Buffer)
if err := codec.Encode(obj, buf); err != nil {
return false, err
}
newBuf := new(bytes.Buffer)
if err := codec.Encode(newObj, newBuf); err != nil {
return false, err
}
return bytes.Equal(buf.Bytes(), newBuf.Bytes()), nil
}

View File

@@ -1,103 +0,0 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/tilt-dev/tilt-apiserver/blob/main/pkg/storage/filepath/watchset.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Kubernetes Authors.
package file
import (
"sync"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
)
// Keeps track of which watches need to be notified
type WatchSet struct {
mu sync.RWMutex
nodes map[int]*watchNode
counter int
}
func NewWatchSet() *WatchSet {
return &WatchSet{
nodes: make(map[int]*watchNode, 20),
counter: 0,
}
}
// Creates a new watch with a unique id, but
// does not start sending events to it until start() is called.
func (s *WatchSet) newWatch() *watchNode {
s.mu.Lock()
defer s.mu.Unlock()
s.counter++
return &watchNode{
id: s.counter,
s: s,
updateCh: make(chan watch.Event),
outCh: make(chan watch.Event),
}
}
func (s *WatchSet) cleanupWatchers() {
// Doesn't protect the below access on nodes slice since it won't ever be modified during cleanup
for _, w := range s.nodes {
w.Stop()
}
}
func (s *WatchSet) notifyWatchers(ev watch.Event) {
s.mu.RLock()
for _, w := range s.nodes {
w.updateCh <- ev
}
s.mu.RUnlock()
}
type watchNode struct {
s *WatchSet
id int
updateCh chan watch.Event
outCh chan watch.Event
}
// Start sending events to this watch.
func (w *watchNode) Start(p storage.SelectionPredicate, initEvents []watch.Event) {
w.s.mu.Lock()
w.s.nodes[w.id] = w
w.s.mu.Unlock()
go func() {
for _, e := range initEvents {
w.outCh <- e
}
for e := range w.updateCh {
ok, err := p.Matches(e.Object)
if err != nil {
continue
}
if !ok {
continue
}
w.outCh <- e
}
close(w.outCh)
}()
}
func (w *watchNode) Stop() {
w.s.mu.Lock()
delete(w.s.nodes, w.id)
w.s.mu.Unlock()
close(w.updateCh)
}
func (w *watchNode) ResultChan() <-chan watch.Event {
return w.outCh
}

View File

@@ -3,7 +3,7 @@ package apiserver
import (
"github.com/google/wire"
"github.com/grafana/grafana/pkg/services/apiserver/builder"
"github.com/grafana/grafana/pkg/apiserver/builder"
)
var WireSet = wire.NewSet(