2023-01-17 13:58:52 -06:00
|
|
|
package historian
|
|
|
|
|
|
|
|
import (
|
2023-01-23 16:31:03 -06:00
|
|
|
"bytes"
|
2023-01-26 03:31:20 -06:00
|
|
|
"context"
|
2023-01-23 16:31:03 -06:00
|
|
|
"encoding/json"
|
2023-01-17 13:58:52 -06:00
|
|
|
"fmt"
|
2023-01-23 16:31:03 -06:00
|
|
|
"io"
|
2023-01-17 13:58:52 -06:00
|
|
|
"net/http"
|
|
|
|
"net/url"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
2023-01-30 16:30:05 -06:00
|
|
|
"github.com/grafana/grafana/pkg/setting"
|
2023-01-17 13:58:52 -06:00
|
|
|
)
|
|
|
|
|
|
|
|
const defaultClientTimeout = 30 * time.Second
|
|
|
|
|
2023-01-18 13:24:40 -06:00
|
|
|
type LokiConfig struct {
|
2023-01-30 16:30:05 -06:00
|
|
|
ReadPathURL *url.URL
|
|
|
|
WritePathURL *url.URL
|
2023-01-18 13:24:40 -06:00
|
|
|
BasicAuthUser string
|
|
|
|
BasicAuthPassword string
|
|
|
|
TenantID string
|
2023-01-30 14:24:45 -06:00
|
|
|
ExternalLabels map[string]string
|
2023-01-18 13:24:40 -06:00
|
|
|
}
|
|
|
|
|
2023-01-30 16:30:05 -06:00
|
|
|
func NewLokiConfig(cfg setting.UnifiedAlertingStateHistorySettings) (LokiConfig, error) {
|
|
|
|
read, write := cfg.LokiReadURL, cfg.LokiWriteURL
|
|
|
|
if read == "" {
|
|
|
|
read = cfg.LokiRemoteURL
|
|
|
|
}
|
|
|
|
if write == "" {
|
|
|
|
write = cfg.LokiRemoteURL
|
|
|
|
}
|
|
|
|
|
|
|
|
readURL, err := url.Parse(read)
|
|
|
|
if err != nil {
|
|
|
|
return LokiConfig{}, fmt.Errorf("failed to parse loki remote read URL: %w", err)
|
|
|
|
}
|
|
|
|
writeURL, err := url.Parse(write)
|
|
|
|
if err != nil {
|
|
|
|
return LokiConfig{}, fmt.Errorf("failed to parse loki remote write URL: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return LokiConfig{
|
|
|
|
ReadPathURL: readURL,
|
|
|
|
WritePathURL: writeURL,
|
|
|
|
BasicAuthUser: cfg.LokiBasicAuthUsername,
|
|
|
|
BasicAuthPassword: cfg.LokiBasicAuthPassword,
|
|
|
|
TenantID: cfg.LokiTenantID,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2023-01-17 13:58:52 -06:00
|
|
|
type httpLokiClient struct {
|
|
|
|
client http.Client
|
2023-01-18 13:24:40 -06:00
|
|
|
cfg LokiConfig
|
2023-01-17 13:58:52 -06:00
|
|
|
log log.Logger
|
|
|
|
}
|
|
|
|
|
2023-01-18 13:24:40 -06:00
|
|
|
func newLokiClient(cfg LokiConfig, logger log.Logger) *httpLokiClient {
|
2023-01-17 13:58:52 -06:00
|
|
|
return &httpLokiClient{
|
|
|
|
client: http.Client{
|
|
|
|
Timeout: defaultClientTimeout,
|
|
|
|
},
|
2023-01-18 13:24:40 -06:00
|
|
|
cfg: cfg,
|
2023-01-17 13:58:52 -06:00
|
|
|
log: logger.New("protocol", "http"),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-01-26 03:31:20 -06:00
|
|
|
func (c *httpLokiClient) ping(ctx context.Context) error {
|
2023-01-30 16:30:05 -06:00
|
|
|
uri := c.cfg.ReadPathURL.JoinPath("/loki/api/v1/labels")
|
2023-01-17 13:58:52 -06:00
|
|
|
req, err := http.NewRequest(http.MethodGet, uri.String(), nil)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("error creating request: %w", err)
|
|
|
|
}
|
2023-01-23 16:31:03 -06:00
|
|
|
c.setAuthAndTenantHeaders(req)
|
2023-01-17 13:58:52 -06:00
|
|
|
|
2023-01-26 03:31:20 -06:00
|
|
|
req = req.WithContext(ctx)
|
2023-01-17 13:58:52 -06:00
|
|
|
res, err := c.client.Do(req)
|
|
|
|
if res != nil {
|
|
|
|
defer func() {
|
|
|
|
if err := res.Body.Close(); err != nil {
|
|
|
|
c.log.Warn("Failed to close response body", "err", err)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("error sending request: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
2023-01-18 13:24:40 -06:00
|
|
|
return fmt.Errorf("ping request to loki endpoint returned a non-200 status code: %d", res.StatusCode)
|
2023-01-17 13:58:52 -06:00
|
|
|
}
|
2023-01-18 13:24:40 -06:00
|
|
|
c.log.Debug("Ping request to Loki endpoint succeeded", "status", res.StatusCode)
|
2023-01-17 13:58:52 -06:00
|
|
|
return nil
|
|
|
|
}
|
2023-01-23 16:31:03 -06:00
|
|
|
|
|
|
|
type stream struct {
|
|
|
|
Stream map[string]string `json:"stream"`
|
|
|
|
Values []row `json:"values"`
|
|
|
|
}
|
|
|
|
|
|
|
|
type row struct {
|
|
|
|
At time.Time
|
|
|
|
Val string
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *row) MarshalJSON() ([]byte, error) {
|
|
|
|
return json.Marshal([2]string{
|
|
|
|
fmt.Sprintf("%d", r.At.UnixNano()), r.Val,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2023-01-26 03:31:20 -06:00
|
|
|
func (c *httpLokiClient) push(ctx context.Context, s []stream) error {
|
2023-01-23 16:31:03 -06:00
|
|
|
body := struct {
|
|
|
|
Streams []stream `json:"streams"`
|
|
|
|
}{Streams: s}
|
|
|
|
enc, err := json.Marshal(body)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("failed to serialize Loki payload: %w", err)
|
|
|
|
}
|
|
|
|
|
2023-01-30 16:30:05 -06:00
|
|
|
uri := c.cfg.WritePathURL.JoinPath("/loki/api/v1/push")
|
2023-01-23 16:31:03 -06:00
|
|
|
req, err := http.NewRequest(http.MethodPost, uri.String(), bytes.NewBuffer(enc))
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("failed to create Loki request: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
c.setAuthAndTenantHeaders(req)
|
|
|
|
req.Header.Add("content-type", "application/json")
|
|
|
|
|
2023-01-26 03:31:20 -06:00
|
|
|
req = req.WithContext(ctx)
|
2023-01-23 16:31:03 -06:00
|
|
|
resp, err := c.client.Do(req)
|
|
|
|
if resp != nil {
|
|
|
|
defer func() {
|
|
|
|
if err := resp.Body.Close(); err != nil {
|
|
|
|
c.log.Warn("Failed to close response body", "err", err)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("failed to send request: %w", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
|
|
byt, _ := io.ReadAll(resp.Body)
|
|
|
|
if len(byt) > 0 {
|
|
|
|
c.log.Error("Error response from Loki", "response", string(byt), "status", resp.StatusCode)
|
|
|
|
} else {
|
|
|
|
c.log.Error("Error response from Loki with an empty body", "status", resp.StatusCode)
|
|
|
|
}
|
|
|
|
return fmt.Errorf("received a non-200 response from loki, status: %d", resp.StatusCode)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *httpLokiClient) setAuthAndTenantHeaders(req *http.Request) {
|
|
|
|
if c.cfg.BasicAuthUser != "" || c.cfg.BasicAuthPassword != "" {
|
|
|
|
req.SetBasicAuth(c.cfg.BasicAuthUser, c.cfg.BasicAuthPassword)
|
|
|
|
}
|
|
|
|
|
|
|
|
if c.cfg.TenantID != "" {
|
|
|
|
req.Header.Add("X-Scope-OrgID", c.cfg.TenantID)
|
|
|
|
}
|
|
|
|
}
|