CloudMonitoring: Move data manipulation to backend (#41379)

This commit is contained in:
Isabella Siu
2021-11-10 08:58:04 -05:00
committed by GitHub
parent a45e4ff73f
commit fc3d3ff003
12 changed files with 531 additions and 442 deletions

View File

@@ -3,7 +3,6 @@ package cloudmonitoring
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
@@ -16,8 +15,7 @@ import (
"strings"
"time"
"golang.org/x/oauth2/google"
"github.com/grafana/grafana-google-sdk-go/pkg/utils"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
@@ -87,6 +85,7 @@ func ProvideService(cfg *setting.Cfg, httpClientProvider httpclient.Provider, re
factory := coreplugin.New(backend.ServeOpts{
QueryDataHandler: s,
CallResourceHandler: httpadapter.New(mux),
CheckHealthHandler: s,
})
if err := registrar.LoadAndRegister(pluginID, factory); err != nil {
@@ -95,6 +94,45 @@ func ProvideService(cfg *setting.Cfg, httpClientProvider httpclient.Provider, re
return s
}
func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
dsInfo, err := s.getDSInfo(req.PluginContext)
if err != nil {
return nil, err
}
defaultProject, err := s.getDefaultProject(ctx, *dsInfo)
if err != nil {
return nil, err
}
url := fmt.Sprintf("%v/v3/projects/%v/metricDescriptors", dsInfo.services[cloudMonitor].url, defaultProject)
request, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
res, err := dsInfo.services[cloudMonitor].client.Do(request)
if err != nil {
return nil, err
}
defer func() {
if err := res.Body.Close(); err != nil {
slog.Warn("Failed to close response body", "err", err)
}
}()
status := backend.HealthStatusOk
message := "Successfully queried the Google Cloud Monitoring API."
if res.StatusCode != 200 {
status = backend.HealthStatusError
message = res.Status
}
return &backend.CheckHealthResult{
Status: status,
Message: message,
}, nil
}
type Service struct {
httpClientProvider httpclient.Provider
cfg *setting.Cfg
@@ -206,8 +244,6 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
switch model.Type {
case "annotationQuery":
resp, err = s.executeAnnotationQuery(ctx, req, *dsInfo)
case "getGCEDefaultProject":
resp, err = s.getGCEDefaultProject(ctx, req, *dsInfo)
case "timeSeriesQuery":
fallthrough
default:
@@ -217,26 +253,6 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
return resp, err
}
func (s *Service) getGCEDefaultProject(ctx context.Context, req *backend.QueryDataRequest, dsInfo datasourceInfo) (*backend.QueryDataResponse, error) {
gceDefaultProject, err := s.getDefaultProject(ctx, dsInfo)
if err != nil {
return backend.NewQueryDataResponse(), fmt.Errorf(
"failed to retrieve default project from GCE metadata server, error: %w", err)
}
return &backend.QueryDataResponse{
Responses: backend.Responses{
req.Queries[0].RefID: {
Frames: data.Frames{data.NewFrame("").SetMeta(&data.FrameMeta{
Custom: map[string]interface{}{
"defaultProject": gceDefaultProject,
},
})},
},
},
}, nil
}
func (s *Service) executeTimeSeriesQuery(ctx context.Context, req *backend.QueryDataRequest, dsInfo datasourceInfo) (
*backend.QueryDataResponse, error) {
resp := backend.NewQueryDataResponse()
@@ -606,19 +622,7 @@ func (s *Service) createRequest(ctx context.Context, dsInfo *datasourceInfo, pro
func (s *Service) getDefaultProject(ctx context.Context, dsInfo datasourceInfo) (string, error) {
if dsInfo.authenticationType == gceAuthentication {
defaultCredentials, err := google.FindDefaultCredentials(ctx, "https://www.googleapis.com/auth/monitoring.read")
if err != nil {
return "", fmt.Errorf("failed to retrieve default project from GCE metadata server: %w", err)
}
token, err := defaultCredentials.TokenSource.Token()
if err != nil {
return "", fmt.Errorf("failed to retrieve GCP credential token: %w", err)
}
if !token.Valid() {
return "", errors.New("failed to validate GCP credentials")
}
return defaultCredentials.ProjectID, nil
return utils.GCEDefaultProject(ctx)
}
return dsInfo.defaultProject, nil
}

View File

@@ -1,31 +1,244 @@
package cloudmonitoring
import (
"bytes"
"compress/flate"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"regexp"
"strings"
"github.com/andybalholm/brotli"
"github.com/grafana/grafana-google-sdk-go/pkg/utils"
"github.com/grafana/grafana-plugin-sdk-go/backend/resource/httpadapter"
)
// nameExp matches the part after the last '/' symbol
var nameExp = regexp.MustCompile(`([^\/]*)\/*$`)
const resourceManagerPath = "/v1/projects"
type processResponse func(body []byte) ([]byte, error)
func (s *Service) registerRoutes(mux *http.ServeMux) {
mux.HandleFunc("/cloudmonitoring/", s.resourceHandler(cloudMonitor))
mux.HandleFunc("/cloudresourcemanager/", s.resourceHandler(resourceManager))
mux.HandleFunc("/gceDefaultProject", getGCEDefaultProject)
mux.HandleFunc("/metricDescriptors/", s.resourceHandler(cloudMonitor, processMetricDescriptors))
mux.HandleFunc("/services/", s.resourceHandler(cloudMonitor, processServices))
mux.HandleFunc("/slo-services/", s.resourceHandler(cloudMonitor, processSLOs))
mux.HandleFunc("/projects", s.resourceHandler(resourceManager, processProjects))
}
func (s *Service) resourceHandler(subDataSource string) func(rw http.ResponseWriter, req *http.Request) {
func getGCEDefaultProject(rw http.ResponseWriter, req *http.Request) {
project, err := utils.GCEDefaultProject(req.Context())
if err != nil {
writeResponse(rw, http.StatusBadRequest, fmt.Sprintf("unexpected error %v", err))
return
}
writeResponse(rw, http.StatusOK, project)
}
func (s *Service) resourceHandler(subDataSource string, responseFn processResponse) func(rw http.ResponseWriter, req *http.Request) {
return func(rw http.ResponseWriter, req *http.Request) {
client, code, err := s.setRequestVariables(req, subDataSource)
if err != nil {
writeResponse(rw, code, fmt.Sprintf("unexpected error %v", err))
return
}
doRequest(rw, req, client)
s.doRequest(rw, req, client, responseFn)
}
}
func (s *Service) doRequest(rw http.ResponseWriter, req *http.Request, cli *http.Client, responseFn processResponse) http.ResponseWriter {
res, err := cli.Do(req)
if err != nil {
writeResponse(rw, http.StatusBadRequest, fmt.Sprintf("unexpected error %v", err))
return rw
}
defer func() {
if err := res.Body.Close(); err != nil {
slog.Warn("Failed to close response body", "err", err)
}
}()
if responseFn == nil {
writeResponse(rw, http.StatusInternalServerError, "responseFn should not be nil")
return rw
}
body, code, err := processData(res, responseFn)
if err != nil {
writeResponse(rw, code, fmt.Sprintf("unexpected error %v", err))
return rw
}
writeResponseBytes(rw, res.StatusCode, body)
for k, v := range res.Header {
rw.Header().Set(k, v[0])
for _, v := range v[1:] {
rw.Header().Add(k, v)
}
}
return rw
}
func processMetricDescriptors(body []byte) ([]byte, error) {
resp := metricDescriptorResponse{}
err := json.Unmarshal(body, &resp)
if err != nil {
return nil, err
}
for i := range resp.Descriptors {
resp.Descriptors[i].Service = strings.SplitN(resp.Descriptors[i].Type, "/", 2)[0]
resp.Descriptors[i].ServiceShortName = strings.SplitN(resp.Descriptors[i].Service, ".", 2)[0]
if resp.Descriptors[i].DisplayName == "" {
resp.Descriptors[i].DisplayName = resp.Descriptors[i].Type
}
}
return json.Marshal(resp.Descriptors)
}
func processServices(body []byte) ([]byte, error) {
resp := serviceResponse{}
err := json.Unmarshal(body, &resp)
if err != nil {
return nil, err
}
values := []selectableValue{}
for _, service := range resp.Services {
name := nameExp.FindString(service.Name)
if name == "" {
return nil, fmt.Errorf("unexpected service name: %v", service.Name)
}
label := service.DisplayName
if label == "" {
label = name
}
values = append(values, selectableValue{
Value: name,
Label: label,
})
}
return json.Marshal(values)
}
func processSLOs(body []byte) ([]byte, error) {
resp := sloResponse{}
err := json.Unmarshal(body, &resp)
if err != nil {
return nil, err
}
values := []selectableValue{}
for _, slo := range resp.SLOs {
name := nameExp.FindString(slo.Name)
if name == "" {
return nil, fmt.Errorf("unexpected service name: %v", slo.Name)
}
values = append(values, selectableValue{
Value: name,
Label: slo.DisplayName,
Goal: slo.Goal,
})
}
return json.Marshal(values)
}
func processProjects(body []byte) ([]byte, error) {
resp := projectResponse{}
err := json.Unmarshal(body, &resp)
if err != nil {
return nil, err
}
values := []selectableValue{}
for _, project := range resp.Projects {
values = append(values, selectableValue{
Value: project.ProjectID,
Label: project.Name,
})
}
return json.Marshal(values)
}
func processData(res *http.Response, responseFn processResponse) ([]byte, int, error) {
encoding := res.Header.Get("Content-Encoding")
var reader io.Reader
var err error
switch encoding {
case "gzip":
reader, err = gzip.NewReader(res.Body)
if err != nil {
return nil, http.StatusBadRequest, fmt.Errorf("unexpected error %v", err)
}
defer func() {
if err := reader.(io.ReadCloser).Close(); err != nil {
slog.Warn("Failed to close reader body", "err", err)
}
}()
case "deflate":
reader = flate.NewReader(res.Body)
defer func() {
if err := reader.(io.ReadCloser).Close(); err != nil {
slog.Warn("Failed to close reader body", "err", err)
}
}()
case "br":
reader = brotli.NewReader(res.Body)
case "":
reader = res.Body
default:
return nil, http.StatusInternalServerError, fmt.Errorf("unexpected encoding type %v", err)
}
body, err := ioutil.ReadAll(reader)
if err != nil {
return nil, http.StatusBadRequest, fmt.Errorf("unexpected error %v", err)
}
body, err = responseFn(body)
if err != nil {
return nil, http.StatusInternalServerError, fmt.Errorf("data processing error %v", err)
}
buf := new(bytes.Buffer)
var writer io.Writer = buf
switch encoding {
case "gzip":
writer = gzip.NewWriter(writer)
case "deflate":
writer, err = flate.NewWriter(writer, -1)
if err != nil {
return nil, http.StatusInternalServerError, fmt.Errorf("unexpected error %v", err)
}
case "br":
writer = brotli.NewWriter(writer)
case "":
default:
return nil, http.StatusInternalServerError, fmt.Errorf("unexpected encoding type %v", encoding)
}
_, err = writer.Write(body)
if writeCloser, ok := writer.(io.WriteCloser); ok {
if err := writeCloser.Close(); err != nil {
slog.Warn("Failed to close writer body", "err", err)
}
}
if err != nil {
return nil, http.StatusInternalServerError, fmt.Errorf("unable to encode response %v", err)
}
return buf.Bytes(), 0, nil
}
func (s *Service) setRequestVariables(req *http.Request, subDataSource string) (*http.Client, int, error) {
slog.Debug("Received resource call", "url", req.URL.String(), "method", req.Method)
@@ -50,48 +263,10 @@ func (s *Service) setRequestVariables(req *http.Request, subDataSource string) (
return dsInfo.services[subDataSource].client, 0, nil
}
func doRequest(rw http.ResponseWriter, req *http.Request, cli *http.Client) http.ResponseWriter {
res, err := cli.Do(req)
if err != nil {
rw.WriteHeader(http.StatusBadRequest)
_, err = rw.Write([]byte(fmt.Sprintf("unexpected error %v", err)))
if err != nil {
slog.Error("Unable to write HTTP response", "error", err)
}
return nil
}
defer func() {
if err := res.Body.Close(); err != nil {
slog.Warn("Failed to close response body", "err", err)
}
}()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
rw.WriteHeader(http.StatusInternalServerError)
_, err = rw.Write([]byte(fmt.Sprintf("unexpected error %v", err)))
if err != nil {
slog.Error("Unable to write HTTP response", "error", err)
}
return nil
}
rw.WriteHeader(res.StatusCode)
_, err = rw.Write(body)
if err != nil {
slog.Error("Unable to write HTTP response", "error", err)
}
for k, v := range res.Header {
rw.Header().Set(k, v[0])
for _, v := range v[1:] {
rw.Header().Add(k, v)
}
}
// Returning the response write for testing purposes
return rw
}
func getTarget(original string) (target string, err error) {
if original == "/projects" {
return resourceManagerPath, nil
}
splittedPath := strings.SplitN(original, "/", 3)
if len(splittedPath) < 3 {
err = fmt.Errorf("the request should contain the service on its path")
@@ -101,14 +276,18 @@ func getTarget(original string) (target string, err error) {
return
}
func writeResponse(rw http.ResponseWriter, code int, msg string) {
func writeResponseBytes(rw http.ResponseWriter, code int, msg []byte) {
rw.WriteHeader(code)
_, err := rw.Write([]byte(msg))
_, err := rw.Write(msg)
if err != nil {
slog.Error("Unable to write HTTP response", "error", err)
}
}
func writeResponse(rw http.ResponseWriter, code int, msg string) {
writeResponseBytes(rw, code, []byte(msg))
}
func (s *Service) getDataSourceFromHTTPReq(req *http.Request) (*datasourceInfo, error) {
ctx := req.Context()
pluginContext := httpadapter.PluginConfigFromContext(ctx)

View File

@@ -1,6 +1,7 @@
package cloudmonitoring
import (
"encoding/json"
"io/ioutil"
"net/http"
"net/http/httptest"
@@ -42,6 +43,10 @@ func Test_parseResourcePath(t *testing.T) {
}
}
func fakeResponseFn(input []byte) ([]byte, error) {
return input, nil
}
func Test_doRequest(t *testing.T) {
// test that it forwards the header and body
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -55,8 +60,10 @@ func Test_doRequest(t *testing.T) {
if err != nil {
t.Error(err)
}
s := Service{}
rw := httptest.NewRecorder()
res := doRequest(rw, req, srv.Client())
res := s.doRequest(rw, req, srv.Client(), fakeResponseFn)
if res.Header().Get("foo") != "bar" {
t.Errorf("Unexpected headers: %v", res.Header())
}
@@ -112,3 +119,149 @@ func Test_setRequestVariables(t *testing.T) {
t.Errorf("Unexpected result URL. Got %s, expecting %s", req.URL.String(), expectedURL)
}
}
func Test_processData_functions(t *testing.T) {
// metricDescriptors
metricDescriptorResp := metricDescriptorResponse{
Descriptors: []metricDescriptor{
{
ValueType: "INT64",
MetricKind: "DELTA",
Type: "actions.googleapis.com/smarthome_action/local_event_count",
Unit: "1",
Service: "foo",
ServiceShortName: "bar",
DisplayName: "Local event count",
Description: "baz",
},
},
}
marshaledMDResponse, _ := json.Marshal(metricDescriptorResp)
metricDescriptorResult := []metricDescriptor{
{
ValueType: "INT64",
MetricKind: "DELTA",
Type: "actions.googleapis.com/smarthome_action/local_event_count",
Unit: "1",
Service: "actions.googleapis.com",
ServiceShortName: "actions",
DisplayName: "Local event count",
Description: "baz",
},
}
marshaledMDResult, _ := json.Marshal(metricDescriptorResult)
// services
serviceResp := serviceResponse{
Services: []serviceDescription{
{
Name: "blah/foo",
DisplayName: "bar",
},
{
Name: "abc",
DisplayName: "",
},
},
}
marshaledServiceResponse, _ := json.Marshal(serviceResp)
serviceResult := []selectableValue{
{
Value: "foo",
Label: "bar",
},
{
Value: "abc",
Label: "abc",
},
}
marshaledServiceResult, _ := json.Marshal(serviceResult)
// slos
sloResp := sloResponse{
SLOs: []sloDescription{
{
Name: "blah/foo",
DisplayName: "bar",
Goal: 0.1,
},
{
Name: "abc",
DisplayName: "xyz",
Goal: 0.2,
},
},
}
marshaledSLOResponse, _ := json.Marshal(sloResp)
sloResult := []selectableValue{
{
Value: "foo",
Label: "bar",
Goal: 0.1,
},
{
Value: "abc",
Label: "xyz",
Goal: 0.2,
},
}
marshaledSLOResult, _ := json.Marshal(sloResult)
// cloudresourcemanager
cloudResourceResp := projectResponse{
Projects: []projectDescription{
{
ProjectID: "foo",
Name: "bar",
},
{
ProjectID: "abc",
Name: "abc",
},
},
}
marshaledCRResponse, _ := json.Marshal(cloudResourceResp)
tests := []struct {
name string
responseFn processResponse
input []byte
result []byte
}{
{
"metricDescriptor",
processMetricDescriptors,
marshaledMDResponse,
marshaledMDResult,
},
{
"services",
processServices,
marshaledServiceResponse,
marshaledServiceResult,
},
{
"slos",
processSLOs,
marshaledSLOResponse,
marshaledSLOResult,
},
{
"cloudresourcemanager",
processProjects,
marshaledCRResponse,
marshaledServiceResult,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
res, err := test.responseFn(test.input)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
if string(test.result) != string(res) {
t.Errorf("Unexpected result. Got %s, expecting %s", res, test.result)
}
})
}
}

View File

@@ -189,3 +189,50 @@ type timeSeries struct {
} `json:"value"`
} `json:"points"`
}
type metricDescriptorResponse struct {
Descriptors []metricDescriptor `json:"metricDescriptors"`
}
type metricDescriptor struct {
ValueType string `json:"valueType"`
MetricKind string `json:"metricKind"`
Type string `json:"type"`
Unit string `json:"unit"`
Service string `json:"service"`
ServiceShortName string `json:"serviceShortName"`
DisplayName string `json:"displayName"`
Description string `json:"description"`
}
type projectResponse struct {
Projects []projectDescription `json:"projects"`
}
type projectDescription struct {
ProjectID string `json:"projectId"`
Name string `json:"name"`
}
type serviceResponse struct {
Services []serviceDescription `json:"services"`
}
type serviceDescription struct {
Name string `json:"name"`
DisplayName string `json:"displayName"`
}
type sloResponse struct {
SLOs []sloDescription `json:"serviceLevelObjectives"`
}
type sloDescription struct {
Name string `json:"name"`
DisplayName string `json:"displayName"`
Goal float64 `json:"goal"`
}
type selectableValue struct {
Value string `json:"value"`
Label string `json:"label"`
Goal float64 `json:"goal,omitempty"`
}