mirror of
https://github.com/grafana/grafana.git
synced 2024-11-30 12:44:10 -06:00
CloudMonitor: Iterate over pageToken for resources (#42546)
This commit is contained in:
parent
c3208c1850
commit
a2ad0a0fb6
@ -23,7 +23,7 @@ var nameExp = regexp.MustCompile(`([^\/]*)\/*$`)
|
||||
|
||||
const resourceManagerPath = "/v1/projects"
|
||||
|
||||
type processResponse func(body []byte) ([]byte, error)
|
||||
type processResponse func(body []byte) ([]json.RawMessage, string, error)
|
||||
|
||||
func (s *Service) registerRoutes(mux *http.ServeMux) {
|
||||
mux.HandleFunc("/gceDefaultProject", getGCEDefaultProject)
|
||||
@ -50,35 +50,30 @@ func (s *Service) resourceHandler(subDataSource string, responseFn processRespon
|
||||
writeResponse(rw, code, fmt.Sprintf("unexpected error %v", err))
|
||||
return
|
||||
}
|
||||
s.doRequest(rw, req, client, responseFn)
|
||||
getResources(rw, req, client, responseFn)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) doRequest(rw http.ResponseWriter, req *http.Request, cli *http.Client, responseFn processResponse) http.ResponseWriter {
|
||||
res, err := cli.Do(req)
|
||||
if err != nil {
|
||||
writeResponse(rw, http.StatusBadRequest, fmt.Sprintf("unexpected error %v", err))
|
||||
return rw
|
||||
}
|
||||
defer func() {
|
||||
if err := res.Body.Close(); err != nil {
|
||||
slog.Warn("Failed to close response body", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
func getResources(rw http.ResponseWriter, req *http.Request, cli *http.Client, responseFn processResponse) http.ResponseWriter {
|
||||
if responseFn == nil {
|
||||
writeResponse(rw, http.StatusInternalServerError, "responseFn should not be nil")
|
||||
return rw
|
||||
}
|
||||
|
||||
body, code, err := processData(res, responseFn)
|
||||
responses, headers, encoding, code, err := getResponses(req, cli, responseFn)
|
||||
if err != nil {
|
||||
writeResponse(rw, code, fmt.Sprintf("unexpected error %v", err))
|
||||
return rw
|
||||
}
|
||||
writeResponseBytes(rw, res.StatusCode, body)
|
||||
|
||||
for k, v := range res.Header {
|
||||
body, err := buildResponse(responses, encoding)
|
||||
if err != nil {
|
||||
writeResponse(rw, http.StatusInternalServerError, fmt.Sprintf("error formatting responose %v", err))
|
||||
return rw
|
||||
}
|
||||
writeResponseBytes(rw, code, body)
|
||||
|
||||
for k, v := range headers {
|
||||
rw.Header().Set(k, v[0])
|
||||
for _, v := range v[1:] {
|
||||
rw.Header().Add(k, v)
|
||||
@ -87,97 +82,113 @@ func (s *Service) doRequest(rw http.ResponseWriter, req *http.Request, cli *http
|
||||
return rw
|
||||
}
|
||||
|
||||
func processMetricDescriptors(body []byte) ([]byte, error) {
|
||||
func processMetricDescriptors(body []byte) ([]json.RawMessage, string, error) {
|
||||
resp := metricDescriptorResponse{}
|
||||
err := json.Unmarshal(body, &resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
results := []json.RawMessage{}
|
||||
for i := range resp.Descriptors {
|
||||
resp.Descriptors[i].Service = strings.SplitN(resp.Descriptors[i].Type, "/", 2)[0]
|
||||
resp.Descriptors[i].ServiceShortName = strings.SplitN(resp.Descriptors[i].Service, ".", 2)[0]
|
||||
if resp.Descriptors[i].DisplayName == "" {
|
||||
resp.Descriptors[i].DisplayName = resp.Descriptors[i].Type
|
||||
}
|
||||
descriptor, err := json.Marshal(resp.Descriptors[i])
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
results = append(results, descriptor)
|
||||
}
|
||||
return json.Marshal(resp.Descriptors)
|
||||
return results, resp.Token, nil
|
||||
}
|
||||
|
||||
func processServices(body []byte) ([]byte, error) {
|
||||
func processServices(body []byte) ([]json.RawMessage, string, error) {
|
||||
resp := serviceResponse{}
|
||||
err := json.Unmarshal(body, &resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
values := []selectableValue{}
|
||||
results := []json.RawMessage{}
|
||||
for _, service := range resp.Services {
|
||||
name := nameExp.FindString(service.Name)
|
||||
if name == "" {
|
||||
return nil, fmt.Errorf("unexpected service name: %v", service.Name)
|
||||
return nil, "", fmt.Errorf("unexpected service name: %v", service.Name)
|
||||
}
|
||||
label := service.DisplayName
|
||||
if label == "" {
|
||||
label = name
|
||||
}
|
||||
values = append(values, selectableValue{
|
||||
marshaledValue, err := json.Marshal(selectableValue{
|
||||
Value: name,
|
||||
Label: label,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
results = append(results, marshaledValue)
|
||||
}
|
||||
return json.Marshal(values)
|
||||
return results, resp.Token, nil
|
||||
}
|
||||
|
||||
func processSLOs(body []byte) ([]byte, error) {
|
||||
func processSLOs(body []byte) ([]json.RawMessage, string, error) {
|
||||
resp := sloResponse{}
|
||||
err := json.Unmarshal(body, &resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
values := []selectableValue{}
|
||||
results := []json.RawMessage{}
|
||||
for _, slo := range resp.SLOs {
|
||||
name := nameExp.FindString(slo.Name)
|
||||
if name == "" {
|
||||
return nil, fmt.Errorf("unexpected service name: %v", slo.Name)
|
||||
return nil, "", fmt.Errorf("unexpected service name: %v", slo.Name)
|
||||
}
|
||||
values = append(values, selectableValue{
|
||||
marshaledValue, err := json.Marshal(selectableValue{
|
||||
Value: name,
|
||||
Label: slo.DisplayName,
|
||||
Goal: slo.Goal,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
results = append(results, marshaledValue)
|
||||
}
|
||||
return json.Marshal(values)
|
||||
return results, resp.Token, nil
|
||||
}
|
||||
|
||||
func processProjects(body []byte) ([]byte, error) {
|
||||
func processProjects(body []byte) ([]json.RawMessage, string, error) {
|
||||
resp := projectResponse{}
|
||||
err := json.Unmarshal(body, &resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
values := []selectableValue{}
|
||||
results := []json.RawMessage{}
|
||||
for _, project := range resp.Projects {
|
||||
values = append(values, selectableValue{
|
||||
marshaledValue, err := json.Marshal(selectableValue{
|
||||
Value: project.ProjectID,
|
||||
Label: project.Name,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
results = append(results, marshaledValue)
|
||||
}
|
||||
return json.Marshal(values)
|
||||
return results, resp.Token, nil
|
||||
}
|
||||
|
||||
func processData(res *http.Response, responseFn processResponse) ([]byte, int, error) {
|
||||
encoding := res.Header.Get("Content-Encoding")
|
||||
|
||||
func decode(encoding string, original io.ReadCloser) ([]byte, error) {
|
||||
var reader io.Reader
|
||||
var err error
|
||||
switch encoding {
|
||||
case "gzip":
|
||||
reader, err = gzip.NewReader(res.Body)
|
||||
reader, err = gzip.NewReader(original)
|
||||
if err != nil {
|
||||
return nil, http.StatusBadRequest, fmt.Errorf("unexpected error %v", err)
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
if err := reader.(io.ReadCloser).Close(); err != nil {
|
||||
@ -185,45 +196,44 @@ func processData(res *http.Response, responseFn processResponse) ([]byte, int, e
|
||||
}
|
||||
}()
|
||||
case "deflate":
|
||||
reader = flate.NewReader(res.Body)
|
||||
reader = flate.NewReader(original)
|
||||
defer func() {
|
||||
if err := reader.(io.ReadCloser).Close(); err != nil {
|
||||
slog.Warn("Failed to close reader body", "err", err)
|
||||
}
|
||||
}()
|
||||
case "br":
|
||||
reader = brotli.NewReader(res.Body)
|
||||
reader = brotli.NewReader(original)
|
||||
case "":
|
||||
reader = res.Body
|
||||
reader = original
|
||||
default:
|
||||
return nil, http.StatusInternalServerError, fmt.Errorf("unexpected encoding type %v", err)
|
||||
return nil, fmt.Errorf("unexpected encoding type %v", err)
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(reader)
|
||||
if err != nil {
|
||||
return nil, http.StatusBadRequest, fmt.Errorf("unexpected error %v", err)
|
||||
}
|
||||
|
||||
body, err = responseFn(body)
|
||||
if err != nil {
|
||||
return nil, http.StatusInternalServerError, fmt.Errorf("data processing error %v", err)
|
||||
return nil, err
|
||||
}
|
||||
return body, nil
|
||||
}
|
||||
|
||||
func encode(encoding string, body []byte) ([]byte, error) {
|
||||
buf := new(bytes.Buffer)
|
||||
var writer io.Writer = buf
|
||||
var err error
|
||||
switch encoding {
|
||||
case "gzip":
|
||||
writer = gzip.NewWriter(writer)
|
||||
case "deflate":
|
||||
writer, err = flate.NewWriter(writer, -1)
|
||||
if err != nil {
|
||||
return nil, http.StatusInternalServerError, fmt.Errorf("unexpected error %v", err)
|
||||
return nil, err
|
||||
}
|
||||
case "br":
|
||||
writer = brotli.NewWriter(writer)
|
||||
case "":
|
||||
default:
|
||||
return nil, http.StatusInternalServerError, fmt.Errorf("unexpected encoding type %v", encoding)
|
||||
return nil, fmt.Errorf("unexpected encoding type %v", encoding)
|
||||
}
|
||||
|
||||
_, err = writer.Write(body)
|
||||
@ -233,10 +243,91 @@ func processData(res *http.Response, responseFn processResponse) ([]byte, int, e
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, http.StatusInternalServerError, fmt.Errorf("unable to encode response %v", err)
|
||||
return nil, fmt.Errorf("unable to encode response %v", err)
|
||||
}
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
func processData(data io.ReadCloser, encoding string, responseFn processResponse) ([]json.RawMessage, string, int, error) {
|
||||
body, err := decode(encoding, data)
|
||||
if err != nil {
|
||||
return nil, "", http.StatusBadRequest, fmt.Errorf("unable to decode response %v", err)
|
||||
}
|
||||
|
||||
return buf.Bytes(), 0, nil
|
||||
response, token, err := responseFn(body)
|
||||
if err != nil {
|
||||
return nil, "", http.StatusInternalServerError, fmt.Errorf("data processing error %v", err)
|
||||
}
|
||||
return response, token, 0, nil
|
||||
}
|
||||
|
||||
type apiResponse struct {
|
||||
encoding string
|
||||
header http.Header
|
||||
responses []json.RawMessage
|
||||
token string
|
||||
code int
|
||||
err error
|
||||
}
|
||||
|
||||
func doRequest(req *http.Request, cli *http.Client, responseFn processResponse) *apiResponse {
|
||||
res, err := cli.Do(req)
|
||||
if err != nil {
|
||||
return &apiResponse{code: http.StatusBadRequest, err: err}
|
||||
}
|
||||
defer func() {
|
||||
if err := res.Body.Close(); err != nil {
|
||||
slog.Warn("Failed to close response body", "err", err)
|
||||
}
|
||||
}()
|
||||
encoding := res.Header.Get("Content-Encoding")
|
||||
originalHeader := res.Header
|
||||
code := res.StatusCode
|
||||
|
||||
responses, token, errcode, err := processData(res.Body, encoding, responseFn)
|
||||
if err != nil {
|
||||
code = errcode
|
||||
}
|
||||
return &apiResponse{
|
||||
encoding: encoding,
|
||||
header: originalHeader,
|
||||
responses: responses,
|
||||
token: token,
|
||||
code: code,
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
|
||||
func getResponses(req *http.Request, cli *http.Client, responseFn processResponse) ([]json.RawMessage, http.Header, string, int, error) {
|
||||
result := doRequest(req, cli, responseFn)
|
||||
if result.err != nil {
|
||||
return nil, nil, "", result.code, result.err
|
||||
}
|
||||
|
||||
token := result.token
|
||||
responses := result.responses
|
||||
for token != "" {
|
||||
query := req.URL.Query()
|
||||
query.Set("pageToken", token)
|
||||
req.URL.RawQuery = query.Encode()
|
||||
|
||||
loopResult := doRequest(req, cli, responseFn)
|
||||
if loopResult.err != nil {
|
||||
return nil, nil, "", loopResult.code, loopResult.err
|
||||
}
|
||||
responses = append(responses, loopResult.responses...)
|
||||
token = loopResult.token
|
||||
}
|
||||
return responses, result.header, result.encoding, result.code, nil
|
||||
}
|
||||
|
||||
func buildResponse(responses []json.RawMessage, encoding string) ([]byte, error) {
|
||||
body, err := json.Marshal(responses)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("response marshaling error %v", err)
|
||||
}
|
||||
|
||||
return encode(encoding, body)
|
||||
}
|
||||
|
||||
func (s *Service) setRequestVariables(req *http.Request, subDataSource string) (*http.Client, int, error) {
|
||||
|
@ -43,15 +43,23 @@ func Test_parseResourcePath(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func fakeResponseFn(input []byte) ([]byte, error) {
|
||||
return input, nil
|
||||
}
|
||||
|
||||
func Test_doRequest(t *testing.T) {
|
||||
// test that it forwards the header and body
|
||||
// test that it forwards the header and body over multiple calls
|
||||
elements := []string{"1", "2", "3"}
|
||||
index := 0
|
||||
|
||||
fakeResponseFn := func(input []byte) ([]json.RawMessage, string, error) {
|
||||
results := []json.RawMessage{input}
|
||||
if index < len(elements) {
|
||||
return results, "token", nil
|
||||
}
|
||||
return results, "", nil
|
||||
}
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Add("foo", "bar")
|
||||
_, err := w.Write([]byte("result"))
|
||||
_, err := w.Write([]byte(elements[index]))
|
||||
index++
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -60,10 +68,9 @@ func Test_doRequest(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
s := Service{}
|
||||
|
||||
rw := httptest.NewRecorder()
|
||||
res := s.doRequest(rw, req, srv.Client(), fakeResponseFn)
|
||||
res := getResources(rw, req, srv.Client(), fakeResponseFn)
|
||||
if res.Header().Get("foo") != "bar" {
|
||||
t.Errorf("Unexpected headers: %v", res.Header())
|
||||
}
|
||||
@ -76,7 +83,7 @@ func Test_doRequest(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if string(body) != "result" {
|
||||
if string(body) != "[1,2,3]" {
|
||||
t.Errorf("Unexpected body: %v", string(body))
|
||||
}
|
||||
}
|
||||
@ -135,6 +142,7 @@ func Test_processData_functions(t *testing.T) {
|
||||
Description: "baz",
|
||||
},
|
||||
},
|
||||
Token: "foo",
|
||||
}
|
||||
marshaledMDResponse, _ := json.Marshal(metricDescriptorResp)
|
||||
metricDescriptorResult := []metricDescriptor{
|
||||
@ -227,35 +235,47 @@ func Test_processData_functions(t *testing.T) {
|
||||
responseFn processResponse
|
||||
input []byte
|
||||
result []byte
|
||||
token string
|
||||
}{
|
||||
{
|
||||
"metricDescriptor",
|
||||
processMetricDescriptors,
|
||||
marshaledMDResponse,
|
||||
marshaledMDResult,
|
||||
"foo",
|
||||
},
|
||||
{
|
||||
"services",
|
||||
processServices,
|
||||
marshaledServiceResponse,
|
||||
marshaledServiceResult,
|
||||
"",
|
||||
},
|
||||
{
|
||||
"slos",
|
||||
processSLOs,
|
||||
marshaledSLOResponse,
|
||||
marshaledSLOResult,
|
||||
"",
|
||||
},
|
||||
{
|
||||
"cloudresourcemanager",
|
||||
processProjects,
|
||||
marshaledCRResponse,
|
||||
marshaledServiceResult,
|
||||
"",
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
res, err := test.responseFn(test.input)
|
||||
results, token, err := test.responseFn(test.input)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
if token != test.token {
|
||||
t.Errorf("Unexpected token. Got %s, expecting %s", token, test.token)
|
||||
}
|
||||
res, err := json.Marshal(results)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
|
@ -192,6 +192,7 @@ type timeSeries struct {
|
||||
|
||||
type metricDescriptorResponse struct {
|
||||
Descriptors []metricDescriptor `json:"metricDescriptors"`
|
||||
Token string `json:"nextPageToken"`
|
||||
}
|
||||
type metricDescriptor struct {
|
||||
ValueType string `json:"valueType"`
|
||||
@ -206,6 +207,7 @@ type metricDescriptor struct {
|
||||
|
||||
type projectResponse struct {
|
||||
Projects []projectDescription `json:"projects"`
|
||||
Token string `json:"nextPageToken"`
|
||||
}
|
||||
|
||||
type projectDescription struct {
|
||||
@ -215,6 +217,7 @@ type projectDescription struct {
|
||||
|
||||
type serviceResponse struct {
|
||||
Services []serviceDescription `json:"services"`
|
||||
Token string `json:"nextPageToken"`
|
||||
}
|
||||
type serviceDescription struct {
|
||||
Name string `json:"name"`
|
||||
@ -222,7 +225,8 @@ type serviceDescription struct {
|
||||
}
|
||||
|
||||
type sloResponse struct {
|
||||
SLOs []sloDescription `json:"serviceLevelObjectives"`
|
||||
SLOs []sloDescription `json:"serviceLevelObjectives"`
|
||||
Token string `json:"nextPageToken"`
|
||||
}
|
||||
|
||||
type sloDescription struct {
|
||||
|
Loading…
Reference in New Issue
Block a user