grafana/pkg/storage/unified/resource/health.go
Georges Chaudy 4baca6947d
ResourceServer: make the resource store the default unified storage backend (#90899)
* make the resource store the default unified storage backend

* add integration tests

* fix test non passing

* Update pkg/storage/unified/sql/test/integration_test.go

Co-authored-by: Ryan McKinley <ryantxu@gmail.com>

* lint

* fix tests

* fix no rows

---------

Co-authored-by: Ryan McKinley <ryantxu@gmail.com>
2024-07-25 18:17:39 +02:00

89 lines
2.2 KiB
Go

package resource
import (
"context"
"errors"
"time"
grpcAuth "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth"
"google.golang.org/grpc/health/grpc_health_v1"
)
// Compile-time assertion
var _ HealthService = &healthServer{}
type HealthService interface {
grpc_health_v1.HealthServer
grpcAuth.ServiceAuthFuncOverride
}
func ProvideHealthService(server DiagnosticsServer) (grpc_health_v1.HealthServer, error) {
h := &healthServer{srv: server}
return h, nil
}
type healthServer struct {
srv DiagnosticsServer
}
// AuthFuncOverride for no auth for health service.
func (s *healthServer) AuthFuncOverride(ctx context.Context, _ string) (context.Context, error) {
return ctx, nil
}
func (s *healthServer) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
r, err := s.srv.IsHealthy(ctx, &HealthCheckRequest{})
if err != nil {
return nil, err
}
return &grpc_health_v1.HealthCheckResponse{
Status: grpc_health_v1.HealthCheckResponse_ServingStatus(r.Status.Number()),
}, nil
}
func (s *healthServer) Watch(req *grpc_health_v1.HealthCheckRequest, stream grpc_health_v1.Health_WatchServer) error {
h, err := s.srv.IsHealthy(stream.Context(), &HealthCheckRequest{})
if err != nil {
return err
}
// send initial health status
err = stream.Send(&grpc_health_v1.HealthCheckResponse{
Status: grpc_health_v1.HealthCheckResponse_ServingStatus(h.Status.Number()),
})
if err != nil {
return err
}
currHealth := h.Status.Number()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// get current health status
h, err := s.srv.IsHealthy(stream.Context(), &HealthCheckRequest{})
if err != nil {
return err
}
// if health status has not changed, continue
if h.Status.Number() == currHealth {
continue
}
// send the new health status
currHealth = h.Status.Number()
err = stream.Send(&grpc_health_v1.HealthCheckResponse{
Status: grpc_health_v1.HealthCheckResponse_ServingStatus(h.Status.Number()),
})
if err != nil {
return err
}
case <-stream.Context().Done():
return errors.New("stream closed, context cancelled")
}
}
}