K8s: bug fixes for file storage to allow for watcher initialization on startup (#83873)

---------

Co-authored-by: Todd Treece <todd.treece@grafana.com>
This commit is contained in:
Charandas 2024-03-05 10:34:47 -08:00 committed by GitHub
parent 3f2820a552
commit e916372249
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 83 additions and 29 deletions

View File

@ -120,12 +120,12 @@ func (s *Storage) Versioner() storage.Versioner {
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
// set to the read value from database.
func (s *Storage) Create(ctx context.Context, key string, obj runtime.Object, out runtime.Object, ttl uint64) error {
filename := s.filePath(key)
if exists(filename) {
fpath := s.filePath(key)
if exists(fpath) {
return storage.NewKeyExistsError(key, 0)
}
dirname := filepath.Dir(filename)
dirname := filepath.Dir(fpath)
if err := ensureDir(dirname); err != nil {
return err
}
@ -148,7 +148,7 @@ func (s *Storage) Create(ctx context.Context, key string, obj runtime.Object, ou
return err
}
if err := writeFile(s.codec, filename, obj); err != nil {
if err := writeFile(s.codec, fpath, obj); err != nil {
return err
}
@ -186,7 +186,7 @@ func (s *Storage) Delete(
validateDeletion storage.ValidateObjectFunc,
cachedExistingObject runtime.Object,
) error {
filename := s.filePath(key)
fpath := s.filePath(key)
var currentState runtime.Object
var stateIsCurrent bool
if cachedExistingObject != nil {
@ -241,7 +241,7 @@ func (s *Storage) Delete(
return err
}
if err := deleteFile(filename); err != nil {
if err := deleteFile(fpath); err != nil {
return err
}
@ -305,8 +305,15 @@ func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOption
// The returned contents may be delayed, but it is guaranteed that they will
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
func (s *Storage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
filename := s.filePath(key)
obj, err := readFile(s.codec, filename, func() runtime.Object {
fpath := s.filePath(key)
// Since it's a get, check if the dir exists and return early as needed
dirname := filepath.Dir(fpath)
if !exists(dirname) {
return apierrors.NewNotFound(s.gr, s.nameFromKey(key))
}
obj, err := readFile(s.codec, fpath, func() runtime.Object {
return objPtr
})
if err != nil {
@ -364,9 +371,14 @@ func (s *Storage) GetList(ctx context.Context, key string, opts storage.ListOpti
}
}
dirname := s.dirPath(key)
dirpath := s.dirPath(key)
// Since it's a get, check if the dir exists and return early as needed
if !exists(dirpath) {
// ensure we return empty list in listObj insted of a not found error
return nil
}
objs, err := readDirRecursive(s.codec, dirname, s.newFunc)
objs, err := readDirRecursive(s.codec, dirpath, s.newFunc)
if err != nil {
return err
}
@ -424,18 +436,25 @@ func (s *Storage) GuaranteedUpdate(
var res storage.ResponseMeta
for attempt := 1; attempt <= MaxUpdateAttempts; attempt = attempt + 1 {
var (
filename = s.filePath(key)
fpath = s.filePath(key)
dirpath = filepath.Dir(fpath)
obj runtime.Object
err error
created bool
)
if !exists(filename) && !ignoreNotFound {
if !exists(dirpath) {
if err := ensureDir(dirpath); err != nil {
return err
}
}
if !exists(fpath) && !ignoreNotFound {
return apierrors.NewNotFound(s.gr, s.nameFromKey(key))
}
obj, err = readFile(s.codec, filename, s.newFunc)
obj, err = readFile(s.codec, fpath, s.newFunc)
if err != nil {
// fallback to new object if the file is not found
obj = s.newFunc()
@ -482,7 +501,7 @@ func (s *Storage) GuaranteedUpdate(
if err := s.Versioner().UpdateObject(updatedObj, *generatedRV); err != nil {
return err
}
if err := writeFile(s.codec, filename, updatedObj); err != nil {
if err := writeFile(s.codec, fpath, updatedObj); err != nil {
return err
}
eventType := watch.Modified

View File

@ -20,12 +20,30 @@ type RESTOptionsGetter struct {
original storagebackend.Config
}
func NewRESTOptionsGetter(path string, originalStorageConfig storagebackend.Config) *RESTOptionsGetter {
// Optionally, this constructor allows specifying directories
// for resources that are required to be read/watched on startup and there
// won't be any write operations that initially bootstrap their directories
func NewRESTOptionsGetter(path string,
originalStorageConfig storagebackend.Config,
createResourceDirs ...string) (*RESTOptionsGetter, error) {
if path == "" {
path = filepath.Join(os.TempDir(), "grafana-apiserver")
}
return &RESTOptionsGetter{path: path, original: originalStorageConfig}
if err := initializeDirs(path, createResourceDirs); err != nil {
return nil, err
}
return &RESTOptionsGetter{path: path, original: originalStorageConfig}, nil
}
func initializeDirs(root string, createResourceDirs []string) error {
for _, dir := range createResourceDirs {
if err := ensureDir(filepath.Join(root, dir)); err != nil {
return err
}
}
return nil
}
func (r *RESTOptionsGetter) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {

View File

@ -22,11 +22,11 @@ func (s *Storage) filePath(key string) string {
return fileName
}
// this is for constructing dirPath in a sanitized way provided you have
// already calculated the key. In order to go in the other direction, from a file path
// key to its dir, use the go standard library: filepath.Dir
func (s *Storage) dirPath(key string) string {
// Replace backslashes with underscores to avoid creating bogus subdirectories
key = strings.Replace(key, "\\", "_", -1)
dirName := filepath.Join(s.root, filepath.Clean(key))
return dirName
return dirPath(s.root, key)
}
func writeFile(codec runtime.Codec, path string, obj runtime.Object) error {
@ -84,6 +84,13 @@ func exists(filepath string) bool {
return err == nil
}
func dirPath(root string, key string) string {
// Replace backslashes with underscores to avoid creating bogus subdirectories
key = strings.Replace(key, "\\", "_", -1)
dirName := filepath.Join(root, filepath.Clean(key))
return dirName
}
func ensureDir(dirname string) error {
if !exists(dirname) {
return os.MkdirAll(dirname, 0700)

View File

@ -48,7 +48,7 @@ func applyGrafanaConfig(cfg *setting.Cfg, features featuremgmt.FeatureToggles, o
o.RecommendedOptions.CoreAPI = nil
o.StorageOptions.StorageType = options.StorageType(apiserverCfg.Key("storage_type").MustString(string(options.StorageTypeLegacy)))
o.StorageOptions.DataPath = filepath.Join(cfg.DataPath, "grafana-apiserver")
o.StorageOptions.DataPath = apiserverCfg.Key("storage_path").MustString(filepath.Join(cfg.DataPath, "grafana-apiserver"))
o.ExtraOptions.DevMode = features.IsEnabledGlobally(featuremgmt.FlagGrafanaAPIServerEnsureKubectlAccess)
o.ExtraOptions.ExternalAddress = host
o.ExtraOptions.APIURL = apiURL

View File

@ -89,7 +89,14 @@ func (o *AggregatorServerOptions) ApplyTo(aggregatorConfig *aggregatorapiserver.
return err
}
// override the RESTOptionsGetter to use the file storage options getter
aggregatorConfig.GenericConfig.RESTOptionsGetter = filestorage.NewRESTOptionsGetter(dataPath, etcdOptions.StorageConfig)
restOptionsGetter, err := filestorage.NewRESTOptionsGetter(dataPath, etcdOptions.StorageConfig,
"apiregistration.k8s.io/apiservices",
"service.grafana.app/externalnames",
)
if err != nil {
return err
}
aggregatorConfig.GenericConfig.RESTOptionsGetter = restOptionsGetter
// prevent generic API server from installing the OpenAPI handler. Aggregator server has its own customized OpenAPI handler.
genericConfig.SkipOpenAPIInstallation = true

View File

@ -31,7 +31,7 @@ func NewStorageOptions() *StorageOptions {
func (o *StorageOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar((*string)(&o.StorageType), "grafana-apiserver-storage-type", string(o.StorageType), "Storage type")
fs.StringVar((*string)(&o.StorageType), "grafana-apiserver-storage-path", string(o.StorageType), "Storage path for file storage")
fs.StringVar(&o.DataPath, "grafana-apiserver-storage-path", o.DataPath, "Storage path for file storage")
}
func (o *StorageOptions) Validate() []error {

View File

@ -31,6 +31,7 @@ import (
"github.com/grafana/grafana/pkg/services/apiserver/aggregator"
"github.com/grafana/grafana/pkg/services/apiserver/auth/authenticator"
"github.com/grafana/grafana/pkg/services/apiserver/auth/authorizer"
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
grafanaapiserveroptions "github.com/grafana/grafana/pkg/services/apiserver/options"
entitystorage "github.com/grafana/grafana/pkg/services/apiserver/storage/entity"
"github.com/grafana/grafana/pkg/services/apiserver/utils"
@ -277,7 +278,11 @@ func (s *service) start(ctx context.Context) error {
case grafanaapiserveroptions.StorageTypeLegacy:
fallthrough
case grafanaapiserveroptions.StorageTypeFile:
serverConfig.RESTOptionsGetter = filestorage.NewRESTOptionsGetter(o.StorageOptions.DataPath, o.RecommendedOptions.Etcd.StorageConfig)
restOptionsGetter, err := filestorage.NewRESTOptionsGetter(o.StorageOptions.DataPath, o.RecommendedOptions.Etcd.StorageConfig)
if err != nil {
return err
}
serverConfig.RESTOptionsGetter = restOptionsGetter
}
// Add OpenAPI specs for each group+version
@ -367,11 +372,9 @@ func (s *service) startAggregator(
serverConfig *genericapiserver.RecommendedConfig,
server *genericapiserver.GenericAPIServer,
) (*genericapiserver.GenericAPIServer, error) {
externalNamesNamespace := "default"
if s.cfg.StackID != "" {
externalNamesNamespace = s.cfg.StackID
}
aggregatorConfig, err := aggregator.CreateAggregatorConfig(s.options, *serverConfig, externalNamesNamespace)
namespaceMapper := request.GetNamespaceMapper(s.cfg)
aggregatorConfig, err := aggregator.CreateAggregatorConfig(s.options, *serverConfig, namespaceMapper(1))
if err != nil {
return nil, err
}