mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
K8s: add aggregator metrics (#94579)
This commit is contained in:
parent
644a16048f
commit
b4234cb06e
@ -33,6 +33,7 @@ import (
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
"k8s.io/klog/v2"
|
||||
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
||||
v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
|
||||
@ -45,15 +46,17 @@ import (
|
||||
"k8s.io/kube-aggregator/pkg/controllers/autoregister"
|
||||
|
||||
servicev0alpha1 "github.com/grafana/grafana/pkg/apis/service/v0alpha1"
|
||||
"github.com/grafana/grafana/pkg/registry/apis/service"
|
||||
|
||||
servicev0alpha1applyconfiguration "github.com/grafana/grafana/pkg/generated/applyconfiguration/service/v0alpha1"
|
||||
serviceclientset "github.com/grafana/grafana/pkg/generated/clientset/versioned"
|
||||
informersv0alpha1 "github.com/grafana/grafana/pkg/generated/informers/externalversions"
|
||||
"github.com/grafana/grafana/pkg/registry/apis/service"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/builder"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/options"
|
||||
)
|
||||
|
||||
// making sure we only register metrics once into legacy registry
|
||||
var registerIntoLegacyRegistryOnce sync.Once
|
||||
|
||||
func readCABundlePEM(path string, devMode bool) ([]byte, error) {
|
||||
if devMode {
|
||||
return nil, nil
|
||||
@ -260,6 +263,15 @@ func CreateAggregatorServer(config *Config, delegateAPIServer genericapiserver.D
|
||||
}
|
||||
}
|
||||
|
||||
metrics := newAvailabilityMetrics()
|
||||
|
||||
// create shared (remote and local) availability metrics
|
||||
// TODO: decouple from legacyregistry
|
||||
registerIntoLegacyRegistryOnce.Do(func() { err = metrics.Register(legacyregistry.Register, legacyregistry.CustomRegister) })
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
availableController, err := NewAvailableConditionController(
|
||||
aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(),
|
||||
externalNamesInformer,
|
||||
@ -267,6 +279,7 @@ func CreateAggregatorServer(config *Config, delegateAPIServer genericapiserver.D
|
||||
nil,
|
||||
proxyCurrentCertKeyContentFunc,
|
||||
completedConfig.ExtraConfig.ServiceResolver,
|
||||
metrics,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -17,7 +17,6 @@ import (
|
||||
"github.com/grafana/grafana/pkg/apis/service/v0alpha1"
|
||||
informersservicev0alpha1 "github.com/grafana/grafana/pkg/generated/informers/externalversions/service/v0alpha1"
|
||||
listersservicev0alpha1 "github.com/grafana/grafana/pkg/generated/listers/service/v0alpha1"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/equality"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
@ -69,6 +68,7 @@ type AvailableConditionController struct {
|
||||
cache map[string]map[string][]string
|
||||
// this lock protects operations on the above cache
|
||||
cacheLock sync.RWMutex
|
||||
metrics *Metrics
|
||||
}
|
||||
|
||||
// NewAvailableConditionController returns a new AvailableConditionController.
|
||||
@ -79,6 +79,7 @@ func NewAvailableConditionController(
|
||||
proxyTransportDial *transport.DialHolder,
|
||||
proxyCurrentCertKeyContent certKeyFunc,
|
||||
serviceResolver ServiceResolver,
|
||||
metrics *Metrics,
|
||||
) (*AvailableConditionController, error) {
|
||||
c := &AvailableConditionController{
|
||||
apiServiceClient: apiServiceClient,
|
||||
@ -94,6 +95,7 @@ func NewAvailableConditionController(
|
||||
),
|
||||
proxyTransportDial: proxyTransportDial,
|
||||
proxyCurrentCertKeyContent: proxyCurrentCertKeyContent,
|
||||
metrics: metrics,
|
||||
}
|
||||
|
||||
// resync on this one because it is low cardinality and rechecking the actual discovery
|
||||
@ -124,6 +126,7 @@ func NewAvailableConditionController(
|
||||
func (c *AvailableConditionController) sync(key string) error {
|
||||
originalAPIService, err := c.apiServiceLister.Get(key)
|
||||
if apierrors.IsNotFound(err) {
|
||||
c.metrics.ForgetAPIService(key)
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
@ -225,7 +228,7 @@ func (c *AvailableConditionController) sync(key string) error {
|
||||
_ = resp.Body.Close()
|
||||
// we should always been in the 200s or 300s
|
||||
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
|
||||
errCh <- fmt.Errorf("bad status from %v: %v", discoveryURL, resp.StatusCode)
|
||||
errCh <- fmt.Errorf("bad status from %v: %d", discoveryURL, resp.StatusCode)
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -285,6 +288,9 @@ func (c *AvailableConditionController) sync(key string) error {
|
||||
// updateAPIServiceStatus only issues an update if a change is detected. We have a tight resync loop to quickly detect dead
|
||||
// apiservices. Doing that means we don't want to quickly issue no-op updates.
|
||||
func (c *AvailableConditionController) updateAPIServiceStatus(originalAPIService, newAPIService *apiregistrationv1.APIService) (*apiregistrationv1.APIService, error) {
|
||||
// update this metric on every sync operation to reflect the actual state
|
||||
c.metrics.SetUnavailableGauge(newAPIService)
|
||||
|
||||
if equality.Semantic.DeepEqual(originalAPIService.Status, newAPIService.Status) {
|
||||
return newAPIService, nil
|
||||
}
|
||||
@ -310,6 +316,7 @@ func (c *AvailableConditionController) updateAPIServiceStatus(originalAPIService
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.metrics.SetUnavailableCounter(originalAPIService, newAPIService)
|
||||
return newAPIService, nil
|
||||
}
|
||||
|
||||
|
170
pkg/services/apiserver/aggregator/metrics.go
Normal file
170
pkg/services/apiserver/aggregator/metrics.go
Normal file
@ -0,0 +1,170 @@
|
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
// Provenance-includes-location: https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics/metrics.go
|
||||
// Provenance-includes-license: Apache-2.0
|
||||
// Provenance-includes-copyright: The Kubernetes Authors.
|
||||
|
||||
package aggregator
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"k8s.io/component-base/metrics"
|
||||
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
||||
apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
|
||||
)
|
||||
|
||||
/*
|
||||
* By default, all the following metrics are defined as falling under
|
||||
* ALPHA stability level https://github.com/kubernetes/enhancements/blob/master/keps/sig-instrumentation/1209-metrics-stability/kubernetes-control-plane-metrics-stability.md#stability-classes)
|
||||
*
|
||||
* Promoting the stability level of the metric is a responsibility of the component owner, since it
|
||||
* involves explicitly acknowledging support for the metric across multiple releases, in accordance with
|
||||
* the metric stability policy.
|
||||
*/
|
||||
var (
|
||||
unavailableGaugeDesc = metrics.NewDesc(
|
||||
"st_aggregator_unavailable_apiservice",
|
||||
"Gauge of Grafana APIServices which are marked as unavailable broken down by APIService name.",
|
||||
[]string{"name"},
|
||||
nil,
|
||||
metrics.ALPHA,
|
||||
"",
|
||||
)
|
||||
)
|
||||
|
||||
type Metrics struct {
|
||||
unavailableCounter *metrics.CounterVec
|
||||
|
||||
*availabilityCollector
|
||||
}
|
||||
|
||||
func newAvailabilityMetrics() *Metrics {
|
||||
return &Metrics{
|
||||
unavailableCounter: metrics.NewCounterVec(
|
||||
&metrics.CounterOpts{
|
||||
// These metrics are registered in the main kube-aggregator package as well, prefixing with single-tenant (ST) to avoid
|
||||
// "duplicate metrics collector registration attempted" in https://github.com/prometheus/client_golang
|
||||
// a more descriptive prefix is already added for apiserver metrics during scraping in cloud and didn't want
|
||||
// to double a word by using a word such as "grafana" here
|
||||
Name: "st_aggregator_unavailable_apiservice_total",
|
||||
Help: "Counter of Grafana APIServices which are marked as unavailable broken down by APIService name and reason.",
|
||||
StabilityLevel: metrics.ALPHA,
|
||||
},
|
||||
[]string{"name", "reason"},
|
||||
),
|
||||
availabilityCollector: newAvailabilityCollector(),
|
||||
}
|
||||
}
|
||||
|
||||
// Register registers apiservice availability metrics.
|
||||
func (m *Metrics) Register(
|
||||
registrationFunc func(metrics.Registerable) error,
|
||||
customRegistrationFunc func(metrics.StableCollector) error,
|
||||
) error {
|
||||
err := registrationFunc(m.unavailableCounter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = customRegistrationFunc(m.availabilityCollector)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnavailableCounter returns a counter to track apiservices marked as unavailable.
|
||||
func (m *Metrics) UnavailableCounter(apiServiceName, reason string) metrics.CounterMetric {
|
||||
return m.unavailableCounter.WithLabelValues(apiServiceName, reason)
|
||||
}
|
||||
|
||||
type availabilityCollector struct {
|
||||
metrics.BaseStableCollector
|
||||
|
||||
mtx sync.RWMutex
|
||||
availabilities map[string]bool
|
||||
}
|
||||
|
||||
// SetUnavailableGauge set the metrics so that it reflect the current state based on availability of the given service
|
||||
func (m *Metrics) SetUnavailableGauge(newAPIService *apiregistrationv1.APIService) {
|
||||
if apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available) {
|
||||
m.SetAPIServiceAvailable(newAPIService.Name)
|
||||
return
|
||||
}
|
||||
|
||||
m.SetAPIServiceUnavailable(newAPIService.Name)
|
||||
}
|
||||
|
||||
// SetUnavailableCounter increases the metrics only if the given service is unavailable and its APIServiceCondition has changed
|
||||
func (m *Metrics) SetUnavailableCounter(originalAPIService, newAPIService *apiregistrationv1.APIService) {
|
||||
wasAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(originalAPIService, apiregistrationv1.Available)
|
||||
isAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available)
|
||||
statusChanged := isAvailable != wasAvailable
|
||||
|
||||
if statusChanged && !isAvailable {
|
||||
reason := "UnknownReason"
|
||||
if newCondition := apiregistrationv1apihelper.GetAPIServiceConditionByType(newAPIService, apiregistrationv1.Available); newCondition != nil {
|
||||
reason = newCondition.Reason
|
||||
}
|
||||
m.UnavailableCounter(newAPIService.Name, reason).Inc()
|
||||
}
|
||||
}
|
||||
|
||||
// Check if apiServiceStatusCollector implements necessary interface.
|
||||
var _ metrics.StableCollector = &availabilityCollector{}
|
||||
|
||||
func newAvailabilityCollector() *availabilityCollector {
|
||||
return &availabilityCollector{
|
||||
availabilities: make(map[string]bool),
|
||||
}
|
||||
}
|
||||
|
||||
// DescribeWithStability implements the metrics.StableCollector interface.
|
||||
func (c *availabilityCollector) DescribeWithStability(ch chan<- *metrics.Desc) {
|
||||
ch <- unavailableGaugeDesc
|
||||
}
|
||||
|
||||
// CollectWithStability implements the metrics.StableCollector interface.
|
||||
func (c *availabilityCollector) CollectWithStability(ch chan<- metrics.Metric) {
|
||||
c.mtx.RLock()
|
||||
defer c.mtx.RUnlock()
|
||||
|
||||
for apiServiceName, isAvailable := range c.availabilities {
|
||||
gaugeValue := 1.0
|
||||
if isAvailable {
|
||||
gaugeValue = 0.0
|
||||
}
|
||||
ch <- metrics.NewLazyConstMetric(
|
||||
unavailableGaugeDesc,
|
||||
metrics.GaugeValue,
|
||||
gaugeValue,
|
||||
apiServiceName,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// SetAPIServiceAvailable sets the given apiservice availability gauge to available.
|
||||
func (c *availabilityCollector) SetAPIServiceAvailable(apiServiceKey string) {
|
||||
c.setAPIServiceAvailability(apiServiceKey, true)
|
||||
}
|
||||
|
||||
// SetAPIServiceUnavailable sets the given apiservice availability gauge to unavailable.
|
||||
func (c *availabilityCollector) SetAPIServiceUnavailable(apiServiceKey string) {
|
||||
c.setAPIServiceAvailability(apiServiceKey, false)
|
||||
}
|
||||
|
||||
func (c *availabilityCollector) setAPIServiceAvailability(apiServiceKey string, availability bool) {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
c.availabilities[apiServiceKey] = availability
|
||||
}
|
||||
|
||||
// ForgetAPIService removes the availability gauge of the given apiservice.
|
||||
func (c *availabilityCollector) ForgetAPIService(apiServiceKey string) {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
delete(c.availabilities, apiServiceKey)
|
||||
}
|
Loading…
Reference in New Issue
Block a user