diff --git a/pkg/tsdb/cloudmonitoring/resource_handler.go b/pkg/tsdb/cloudmonitoring/resource_handler.go index fd4ab79b0fe..32141147a08 100644 --- a/pkg/tsdb/cloudmonitoring/resource_handler.go +++ b/pkg/tsdb/cloudmonitoring/resource_handler.go @@ -23,7 +23,7 @@ var nameExp = regexp.MustCompile(`([^\/]*)\/*$`) const resourceManagerPath = "/v1/projects" -type processResponse func(body []byte) ([]byte, error) +type processResponse func(body []byte) ([]json.RawMessage, string, error) func (s *Service) registerRoutes(mux *http.ServeMux) { mux.HandleFunc("/gceDefaultProject", getGCEDefaultProject) @@ -50,35 +50,30 @@ func (s *Service) resourceHandler(subDataSource string, responseFn processRespon writeResponse(rw, code, fmt.Sprintf("unexpected error %v", err)) return } - s.doRequest(rw, req, client, responseFn) + getResources(rw, req, client, responseFn) } } -func (s *Service) doRequest(rw http.ResponseWriter, req *http.Request, cli *http.Client, responseFn processResponse) http.ResponseWriter { - res, err := cli.Do(req) - if err != nil { - writeResponse(rw, http.StatusBadRequest, fmt.Sprintf("unexpected error %v", err)) - return rw - } - defer func() { - if err := res.Body.Close(); err != nil { - slog.Warn("Failed to close response body", "err", err) - } - }() - +func getResources(rw http.ResponseWriter, req *http.Request, cli *http.Client, responseFn processResponse) http.ResponseWriter { if responseFn == nil { writeResponse(rw, http.StatusInternalServerError, "responseFn should not be nil") return rw } - body, code, err := processData(res, responseFn) + responses, headers, encoding, code, err := getResponses(req, cli, responseFn) if err != nil { writeResponse(rw, code, fmt.Sprintf("unexpected error %v", err)) return rw } - writeResponseBytes(rw, res.StatusCode, body) - for k, v := range res.Header { + body, err := buildResponse(responses, encoding) + if err != nil { + writeResponse(rw, http.StatusInternalServerError, fmt.Sprintf("error formatting responose %v", err)) + return rw + } + writeResponseBytes(rw, code, body) + + for k, v := range headers { rw.Header().Set(k, v[0]) for _, v := range v[1:] { rw.Header().Add(k, v) @@ -87,97 +82,113 @@ func (s *Service) doRequest(rw http.ResponseWriter, req *http.Request, cli *http return rw } -func processMetricDescriptors(body []byte) ([]byte, error) { +func processMetricDescriptors(body []byte) ([]json.RawMessage, string, error) { resp := metricDescriptorResponse{} err := json.Unmarshal(body, &resp) if err != nil { - return nil, err + return nil, "", err } + results := []json.RawMessage{} for i := range resp.Descriptors { resp.Descriptors[i].Service = strings.SplitN(resp.Descriptors[i].Type, "/", 2)[0] resp.Descriptors[i].ServiceShortName = strings.SplitN(resp.Descriptors[i].Service, ".", 2)[0] if resp.Descriptors[i].DisplayName == "" { resp.Descriptors[i].DisplayName = resp.Descriptors[i].Type } + descriptor, err := json.Marshal(resp.Descriptors[i]) + if err != nil { + return nil, "", err + } + results = append(results, descriptor) } - return json.Marshal(resp.Descriptors) + return results, resp.Token, nil } -func processServices(body []byte) ([]byte, error) { +func processServices(body []byte) ([]json.RawMessage, string, error) { resp := serviceResponse{} err := json.Unmarshal(body, &resp) if err != nil { - return nil, err + return nil, "", err } - values := []selectableValue{} + results := []json.RawMessage{} for _, service := range resp.Services { name := nameExp.FindString(service.Name) if name == "" { - return nil, fmt.Errorf("unexpected service name: %v", service.Name) + return nil, "", fmt.Errorf("unexpected service name: %v", service.Name) } label := service.DisplayName if label == "" { label = name } - values = append(values, selectableValue{ + marshaledValue, err := json.Marshal(selectableValue{ Value: name, Label: label, }) + if err != nil { + return nil, "", err + } + results = append(results, marshaledValue) } - return json.Marshal(values) + return results, resp.Token, nil } -func processSLOs(body []byte) ([]byte, error) { +func processSLOs(body []byte) ([]json.RawMessage, string, error) { resp := sloResponse{} err := json.Unmarshal(body, &resp) if err != nil { - return nil, err + return nil, "", err } - values := []selectableValue{} + results := []json.RawMessage{} for _, slo := range resp.SLOs { name := nameExp.FindString(slo.Name) if name == "" { - return nil, fmt.Errorf("unexpected service name: %v", slo.Name) + return nil, "", fmt.Errorf("unexpected service name: %v", slo.Name) } - values = append(values, selectableValue{ + marshaledValue, err := json.Marshal(selectableValue{ Value: name, Label: slo.DisplayName, Goal: slo.Goal, }) + if err != nil { + return nil, "", err + } + results = append(results, marshaledValue) } - return json.Marshal(values) + return results, resp.Token, nil } -func processProjects(body []byte) ([]byte, error) { +func processProjects(body []byte) ([]json.RawMessage, string, error) { resp := projectResponse{} err := json.Unmarshal(body, &resp) if err != nil { - return nil, err + return nil, "", err } - values := []selectableValue{} + results := []json.RawMessage{} for _, project := range resp.Projects { - values = append(values, selectableValue{ + marshaledValue, err := json.Marshal(selectableValue{ Value: project.ProjectID, Label: project.Name, }) + if err != nil { + return nil, "", err + } + results = append(results, marshaledValue) } - return json.Marshal(values) + return results, resp.Token, nil } -func processData(res *http.Response, responseFn processResponse) ([]byte, int, error) { - encoding := res.Header.Get("Content-Encoding") - +func decode(encoding string, original io.ReadCloser) ([]byte, error) { var reader io.Reader var err error switch encoding { case "gzip": - reader, err = gzip.NewReader(res.Body) + reader, err = gzip.NewReader(original) if err != nil { - return nil, http.StatusBadRequest, fmt.Errorf("unexpected error %v", err) + return nil, err } defer func() { if err := reader.(io.ReadCloser).Close(); err != nil { @@ -185,45 +196,44 @@ func processData(res *http.Response, responseFn processResponse) ([]byte, int, e } }() case "deflate": - reader = flate.NewReader(res.Body) + reader = flate.NewReader(original) defer func() { if err := reader.(io.ReadCloser).Close(); err != nil { slog.Warn("Failed to close reader body", "err", err) } }() case "br": - reader = brotli.NewReader(res.Body) + reader = brotli.NewReader(original) case "": - reader = res.Body + reader = original default: - return nil, http.StatusInternalServerError, fmt.Errorf("unexpected encoding type %v", err) + return nil, fmt.Errorf("unexpected encoding type %v", err) } body, err := ioutil.ReadAll(reader) if err != nil { - return nil, http.StatusBadRequest, fmt.Errorf("unexpected error %v", err) - } - - body, err = responseFn(body) - if err != nil { - return nil, http.StatusInternalServerError, fmt.Errorf("data processing error %v", err) + return nil, err } + return body, nil +} +func encode(encoding string, body []byte) ([]byte, error) { buf := new(bytes.Buffer) var writer io.Writer = buf + var err error switch encoding { case "gzip": writer = gzip.NewWriter(writer) case "deflate": writer, err = flate.NewWriter(writer, -1) if err != nil { - return nil, http.StatusInternalServerError, fmt.Errorf("unexpected error %v", err) + return nil, err } case "br": writer = brotli.NewWriter(writer) case "": default: - return nil, http.StatusInternalServerError, fmt.Errorf("unexpected encoding type %v", encoding) + return nil, fmt.Errorf("unexpected encoding type %v", encoding) } _, err = writer.Write(body) @@ -233,10 +243,91 @@ func processData(res *http.Response, responseFn processResponse) ([]byte, int, e } } if err != nil { - return nil, http.StatusInternalServerError, fmt.Errorf("unable to encode response %v", err) + return nil, fmt.Errorf("unable to encode response %v", err) + } + return buf.Bytes(), nil +} + +func processData(data io.ReadCloser, encoding string, responseFn processResponse) ([]json.RawMessage, string, int, error) { + body, err := decode(encoding, data) + if err != nil { + return nil, "", http.StatusBadRequest, fmt.Errorf("unable to decode response %v", err) } - return buf.Bytes(), 0, nil + response, token, err := responseFn(body) + if err != nil { + return nil, "", http.StatusInternalServerError, fmt.Errorf("data processing error %v", err) + } + return response, token, 0, nil +} + +type apiResponse struct { + encoding string + header http.Header + responses []json.RawMessage + token string + code int + err error +} + +func doRequest(req *http.Request, cli *http.Client, responseFn processResponse) *apiResponse { + res, err := cli.Do(req) + if err != nil { + return &apiResponse{code: http.StatusBadRequest, err: err} + } + defer func() { + if err := res.Body.Close(); err != nil { + slog.Warn("Failed to close response body", "err", err) + } + }() + encoding := res.Header.Get("Content-Encoding") + originalHeader := res.Header + code := res.StatusCode + + responses, token, errcode, err := processData(res.Body, encoding, responseFn) + if err != nil { + code = errcode + } + return &apiResponse{ + encoding: encoding, + header: originalHeader, + responses: responses, + token: token, + code: code, + err: err, + } +} + +func getResponses(req *http.Request, cli *http.Client, responseFn processResponse) ([]json.RawMessage, http.Header, string, int, error) { + result := doRequest(req, cli, responseFn) + if result.err != nil { + return nil, nil, "", result.code, result.err + } + + token := result.token + responses := result.responses + for token != "" { + query := req.URL.Query() + query.Set("pageToken", token) + req.URL.RawQuery = query.Encode() + + loopResult := doRequest(req, cli, responseFn) + if loopResult.err != nil { + return nil, nil, "", loopResult.code, loopResult.err + } + responses = append(responses, loopResult.responses...) + token = loopResult.token + } + return responses, result.header, result.encoding, result.code, nil +} + +func buildResponse(responses []json.RawMessage, encoding string) ([]byte, error) { + body, err := json.Marshal(responses) + if err != nil { + return nil, fmt.Errorf("response marshaling error %v", err) + } + + return encode(encoding, body) } func (s *Service) setRequestVariables(req *http.Request, subDataSource string) (*http.Client, int, error) { diff --git a/pkg/tsdb/cloudmonitoring/resource_handler_test.go b/pkg/tsdb/cloudmonitoring/resource_handler_test.go index 1a54cc123e7..a98674251fe 100644 --- a/pkg/tsdb/cloudmonitoring/resource_handler_test.go +++ b/pkg/tsdb/cloudmonitoring/resource_handler_test.go @@ -43,15 +43,23 @@ func Test_parseResourcePath(t *testing.T) { } } -func fakeResponseFn(input []byte) ([]byte, error) { - return input, nil -} - func Test_doRequest(t *testing.T) { - // test that it forwards the header and body + // test that it forwards the header and body over multiple calls + elements := []string{"1", "2", "3"} + index := 0 + + fakeResponseFn := func(input []byte) ([]json.RawMessage, string, error) { + results := []json.RawMessage{input} + if index < len(elements) { + return results, "token", nil + } + return results, "", nil + } + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Add("foo", "bar") - _, err := w.Write([]byte("result")) + _, err := w.Write([]byte(elements[index])) + index++ if err != nil { t.Fatal(err) } @@ -60,10 +68,9 @@ func Test_doRequest(t *testing.T) { if err != nil { t.Error(err) } - s := Service{} rw := httptest.NewRecorder() - res := s.doRequest(rw, req, srv.Client(), fakeResponseFn) + res := getResources(rw, req, srv.Client(), fakeResponseFn) if res.Header().Get("foo") != "bar" { t.Errorf("Unexpected headers: %v", res.Header()) } @@ -76,7 +83,7 @@ func Test_doRequest(t *testing.T) { if err != nil { t.Error(err) } - if string(body) != "result" { + if string(body) != "[1,2,3]" { t.Errorf("Unexpected body: %v", string(body)) } } @@ -135,6 +142,7 @@ func Test_processData_functions(t *testing.T) { Description: "baz", }, }, + Token: "foo", } marshaledMDResponse, _ := json.Marshal(metricDescriptorResp) metricDescriptorResult := []metricDescriptor{ @@ -227,35 +235,47 @@ func Test_processData_functions(t *testing.T) { responseFn processResponse input []byte result []byte + token string }{ { "metricDescriptor", processMetricDescriptors, marshaledMDResponse, marshaledMDResult, + "foo", }, { "services", processServices, marshaledServiceResponse, marshaledServiceResult, + "", }, { "slos", processSLOs, marshaledSLOResponse, marshaledSLOResult, + "", }, { "cloudresourcemanager", processProjects, marshaledCRResponse, marshaledServiceResult, + "", }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - res, err := test.responseFn(test.input) + results, token, err := test.responseFn(test.input) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + if token != test.token { + t.Errorf("Unexpected token. Got %s, expecting %s", token, test.token) + } + res, err := json.Marshal(results) if err != nil { t.Errorf("Unexpected error %v", err) } diff --git a/pkg/tsdb/cloudmonitoring/types.go b/pkg/tsdb/cloudmonitoring/types.go index 1d413f1c812..77cb29913ac 100644 --- a/pkg/tsdb/cloudmonitoring/types.go +++ b/pkg/tsdb/cloudmonitoring/types.go @@ -192,6 +192,7 @@ type timeSeries struct { type metricDescriptorResponse struct { Descriptors []metricDescriptor `json:"metricDescriptors"` + Token string `json:"nextPageToken"` } type metricDescriptor struct { ValueType string `json:"valueType"` @@ -206,6 +207,7 @@ type metricDescriptor struct { type projectResponse struct { Projects []projectDescription `json:"projects"` + Token string `json:"nextPageToken"` } type projectDescription struct { @@ -215,6 +217,7 @@ type projectDescription struct { type serviceResponse struct { Services []serviceDescription `json:"services"` + Token string `json:"nextPageToken"` } type serviceDescription struct { Name string `json:"name"` @@ -222,7 +225,8 @@ type serviceDescription struct { } type sloResponse struct { - SLOs []sloDescription `json:"serviceLevelObjectives"` + SLOs []sloDescription `json:"serviceLevelObjectives"` + Token string `json:"nextPageToken"` } type sloDescription struct {