mirror of
https://github.com/opentofu/opentofu.git
synced 2025-01-01 11:47:07 -06:00
683 lines
18 KiB
Go
683 lines
18 KiB
Go
package consul
|
|
|
|
import (
|
|
"bytes"
|
|
"compress/gzip"
|
|
"context"
|
|
"crypto/md5"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
consulapi "github.com/hashicorp/consul/api"
|
|
multierror "github.com/hashicorp/go-multierror"
|
|
"github.com/hashicorp/terraform/internal/states/remote"
|
|
"github.com/hashicorp/terraform/internal/states/statemgr"
|
|
)
|
|
|
|
const (
|
|
lockSuffix = "/.lock"
|
|
lockInfoSuffix = "/.lockinfo"
|
|
|
|
// The Session TTL associated with this lock.
|
|
lockSessionTTL = "15s"
|
|
|
|
// the delay time from when a session is lost to when the
|
|
// lock is released by the server
|
|
lockDelay = 5 * time.Second
|
|
// interval between attempts to reacquire a lost lock
|
|
lockReacquireInterval = 2 * time.Second
|
|
)
|
|
|
|
var lostLockErr = errors.New("consul lock was lost")
|
|
|
|
// RemoteClient is a remote client that stores data in Consul.
|
|
type RemoteClient struct {
|
|
Client *consulapi.Client
|
|
Path string
|
|
GZip bool
|
|
|
|
mu sync.Mutex
|
|
// lockState is true if we're using locks
|
|
lockState bool
|
|
|
|
// The index of the last state we wrote.
|
|
// If this is > 0, Put will perform a CAS to ensure that the state wasn't
|
|
// changed during the operation. This is important even with locks, because
|
|
// if the client loses the lock for some reason, then reacquires it, we
|
|
// need to make sure that the state was not modified.
|
|
modifyIndex uint64
|
|
|
|
consulLock *consulapi.Lock
|
|
lockCh <-chan struct{}
|
|
|
|
info *statemgr.LockInfo
|
|
|
|
// cancel our goroutine which is monitoring the lock to automatically
|
|
// reacquire it when possible.
|
|
monitorCancel context.CancelFunc
|
|
monitorWG sync.WaitGroup
|
|
|
|
// sessionCancel cancels the Context use for session.RenewPeriodic, and is
|
|
// called when unlocking, or before creating a new lock if the lock is
|
|
// lost.
|
|
sessionCancel context.CancelFunc
|
|
}
|
|
|
|
func (c *RemoteClient) Get() (*remote.Payload, error) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
kv := c.Client.KV()
|
|
|
|
chunked, hash, chunks, pair, err := c.chunkedMode()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if pair == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
c.modifyIndex = pair.ModifyIndex
|
|
|
|
var payload []byte
|
|
if chunked {
|
|
for _, c := range chunks {
|
|
pair, _, err := kv.Get(c, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if pair == nil {
|
|
return nil, fmt.Errorf("Key %q could not be found", c)
|
|
}
|
|
payload = append(payload, pair.Value[:]...)
|
|
}
|
|
} else {
|
|
payload = pair.Value
|
|
}
|
|
|
|
// If the payload starts with 0x1f, it's gzip, not json
|
|
if len(payload) >= 1 && payload[0] == '\x1f' {
|
|
payload, err = uncompressState(payload)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
md5 := md5.Sum(payload)
|
|
|
|
if hash != "" && fmt.Sprintf("%x", md5) != hash {
|
|
return nil, fmt.Errorf("The remote state does not match the expected hash")
|
|
}
|
|
|
|
return &remote.Payload{
|
|
Data: payload,
|
|
MD5: md5[:],
|
|
}, nil
|
|
}
|
|
|
|
func (c *RemoteClient) Put(data []byte) error {
|
|
// The state can be stored in 4 different ways, based on the payload size
|
|
// and whether the user enabled gzip:
|
|
// - single entry mode with plain JSON: a single JSON is stored at
|
|
// "tfstate/my_project"
|
|
// - single entry mode gzip: the JSON payload is first gziped and stored at
|
|
// "tfstate/my_project"
|
|
// - chunked mode with plain JSON: the JSON payload is split in pieces and
|
|
// stored like so:
|
|
// - "tfstate/my_project" -> a JSON payload that contains the path of
|
|
// the chunks and an MD5 sum like so:
|
|
// {
|
|
// "current-hash": "abcdef1234",
|
|
// "chunks": [
|
|
// "tfstate/my_project/tfstate.abcdef1234/0",
|
|
// "tfstate/my_project/tfstate.abcdef1234/1",
|
|
// "tfstate/my_project/tfstate.abcdef1234/2",
|
|
// ]
|
|
// }
|
|
// - "tfstate/my_project/tfstate.abcdef1234/0" -> The first chunk
|
|
// - "tfstate/my_project/tfstate.abcdef1234/1" -> The next one
|
|
// - ...
|
|
// - chunked mode with gzip: the same system but we gziped the JSON payload
|
|
// before splitting it in chunks
|
|
//
|
|
// When overwritting the current state, we need to clean the old chunks if
|
|
// we were in chunked mode (no matter whether we need to use chunks for the
|
|
// new one). To do so based on the 4 possibilities above we look at the
|
|
// value at "tfstate/my_project" and if it is:
|
|
// - absent then it's a new state and there will be nothing to cleanup,
|
|
// - not a JSON payload we were in single entry mode with gzip so there will
|
|
// be nothing to cleanup
|
|
// - a JSON payload, then we were either single entry mode with plain JSON
|
|
// or in chunked mode. To differentiate between the two we look whether a
|
|
// "current-hash" key is present in the payload. If we find one we were
|
|
// in chunked mode and we will need to remove the old chunks (whether or
|
|
// not we were using gzip does not matter in that case).
|
|
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
kv := c.Client.KV()
|
|
|
|
// First we determine what mode we were using and to prepare the cleanup
|
|
chunked, hash, _, _, err := c.chunkedMode()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
cleanupOldChunks := func() {}
|
|
if chunked {
|
|
cleanupOldChunks = func() {
|
|
// We ignore all errors that can happen here because we already
|
|
// saved the new state and there is no way to return a warning to
|
|
// the user. We may end up with dangling chunks but there is no way
|
|
// to be sure we won't.
|
|
path := strings.TrimRight(c.Path, "/") + fmt.Sprintf("/tfstate.%s/", hash)
|
|
kv.DeleteTree(path, nil)
|
|
}
|
|
}
|
|
|
|
payload := data
|
|
if c.GZip {
|
|
if compressedState, err := compressState(data); err == nil {
|
|
payload = compressedState
|
|
} else {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// default to doing a CAS
|
|
verb := consulapi.KVCAS
|
|
|
|
// Assume a 0 index doesn't need a CAS for now, since we are either
|
|
// creating a new state or purposely overwriting one.
|
|
if c.modifyIndex == 0 {
|
|
verb = consulapi.KVSet
|
|
}
|
|
|
|
// The payload may be too large to store in a single KV entry in Consul. We
|
|
// could try to determine whether it will fit or not before sending the
|
|
// request but since we are using the Transaction API and not the KV API,
|
|
// it grows by about a 1/3 when it is base64 encoded plus the overhead of
|
|
// the fields specific to the Transaction API.
|
|
// Rather than trying to calculate the overhead (which could change from
|
|
// one version of Consul to another, and between Consul Community Edition
|
|
// and Consul Enterprise), we try to send the whole state in one request, if
|
|
// it fails because it is too big we then split it in chunks and send each
|
|
// chunk separately.
|
|
// When splitting in chunks, we make each chunk 524288 bits, which is the
|
|
// default max size for raft. If the user changed it, we still may send
|
|
// chunks too big and fail but this is not a setting that should be fiddled
|
|
// with anyway.
|
|
|
|
store := func(payload []byte) error {
|
|
// KV.Put doesn't return the new index, so we use a single operation
|
|
// transaction to get the new index with a single request.
|
|
txOps := consulapi.KVTxnOps{
|
|
&consulapi.KVTxnOp{
|
|
Verb: verb,
|
|
Key: c.Path,
|
|
Value: payload,
|
|
Index: c.modifyIndex,
|
|
},
|
|
}
|
|
|
|
ok, resp, _, err := kv.Txn(txOps, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// transaction was rolled back
|
|
if !ok {
|
|
return fmt.Errorf("consul CAS failed with transaction errors: %v", resp.Errors)
|
|
}
|
|
|
|
if len(resp.Results) != 1 {
|
|
// this probably shouldn't happen
|
|
return fmt.Errorf("expected on 1 response value, got: %d", len(resp.Results))
|
|
}
|
|
|
|
c.modifyIndex = resp.Results[0].ModifyIndex
|
|
|
|
// We remove all the old chunks
|
|
cleanupOldChunks()
|
|
|
|
return nil
|
|
}
|
|
|
|
if err = store(payload); err == nil {
|
|
// The payload was small enough to be stored
|
|
return nil
|
|
} else if !strings.Contains(err.Error(), "too large") {
|
|
// We failed for some other reason, report this to the user
|
|
return err
|
|
}
|
|
|
|
// The payload was too large so we split it in multiple chunks
|
|
|
|
md5 := md5.Sum(data)
|
|
chunks := split(payload, 524288)
|
|
chunkPaths := make([]string, 0)
|
|
|
|
// First we write the new chunks
|
|
for i, p := range chunks {
|
|
path := strings.TrimRight(c.Path, "/") + fmt.Sprintf("/tfstate.%x/%d", md5, i)
|
|
chunkPaths = append(chunkPaths, path)
|
|
_, err := kv.Put(&consulapi.KVPair{
|
|
Key: path,
|
|
Value: p,
|
|
}, nil)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Then we update the link to point to the new chunks
|
|
payload, err = json.Marshal(map[string]interface{}{
|
|
"current-hash": fmt.Sprintf("%x", md5),
|
|
"chunks": chunkPaths,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return store(payload)
|
|
}
|
|
|
|
func (c *RemoteClient) Delete() error {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
kv := c.Client.KV()
|
|
|
|
chunked, hash, _, _, err := c.chunkedMode()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = kv.Delete(c.Path, nil)
|
|
|
|
// If there were chunks we need to remove them
|
|
if chunked {
|
|
path := strings.TrimRight(c.Path, "/") + fmt.Sprintf("/tfstate.%s/", hash)
|
|
kv.DeleteTree(path, nil)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (c *RemoteClient) lockPath() string {
|
|
// we sanitize the path for the lock as Consul does not like having
|
|
// two consecutive slashes for the lock path
|
|
return strings.TrimRight(c.Path, "/")
|
|
}
|
|
|
|
func (c *RemoteClient) putLockInfo(info *statemgr.LockInfo) error {
|
|
info.Path = c.Path
|
|
info.Created = time.Now().UTC()
|
|
|
|
kv := c.Client.KV()
|
|
_, err := kv.Put(&consulapi.KVPair{
|
|
Key: c.lockPath() + lockInfoSuffix,
|
|
Value: info.Marshal(),
|
|
}, nil)
|
|
|
|
return err
|
|
}
|
|
|
|
func (c *RemoteClient) getLockInfo() (*statemgr.LockInfo, error) {
|
|
path := c.lockPath() + lockInfoSuffix
|
|
pair, _, err := c.Client.KV().Get(path, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if pair == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
li := &statemgr.LockInfo{}
|
|
err = json.Unmarshal(pair.Value, li)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error unmarshaling lock info: %s", err)
|
|
}
|
|
|
|
return li, nil
|
|
}
|
|
|
|
func (c *RemoteClient) Lock(info *statemgr.LockInfo) (string, error) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if !c.lockState {
|
|
return "", nil
|
|
}
|
|
|
|
c.info = info
|
|
|
|
// These checks only are to ensure we strictly follow the specification.
|
|
// Terraform shouldn't ever re-lock, so provide errors for the 2 possible
|
|
// states if this is called.
|
|
select {
|
|
case <-c.lockCh:
|
|
// We had a lock, but lost it.
|
|
return "", errors.New("lost consul lock, cannot re-lock")
|
|
default:
|
|
if c.lockCh != nil {
|
|
// we have an active lock already
|
|
return "", fmt.Errorf("state %q already locked", c.Path)
|
|
}
|
|
}
|
|
|
|
return c.lock()
|
|
}
|
|
|
|
// the lock implementation.
|
|
// Only to be called while holding Client.mu
|
|
func (c *RemoteClient) lock() (string, error) {
|
|
// We create a new session here, so it can be canceled when the lock is
|
|
// lost or unlocked.
|
|
lockSession, err := c.createSession()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// store the session ID for correlation with consul logs
|
|
c.info.Info = "consul session: " + lockSession
|
|
|
|
// A random lock ID has been generated but we override it with the session
|
|
// ID as this will make it easier to manually invalidate the session
|
|
// if needed.
|
|
c.info.ID = lockSession
|
|
|
|
opts := &consulapi.LockOptions{
|
|
Key: c.lockPath() + lockSuffix,
|
|
Session: lockSession,
|
|
|
|
// only wait briefly, so terraform has the choice to fail fast or
|
|
// retry as needed.
|
|
LockWaitTime: time.Second,
|
|
LockTryOnce: true,
|
|
|
|
// Don't let the lock monitor give up right away, as it's possible the
|
|
// session is still OK. While the session is refreshed at a rate of
|
|
// TTL/2, the lock monitor is an idle blocking request and is more
|
|
// susceptible to being closed by a lower network layer.
|
|
MonitorRetries: 5,
|
|
//
|
|
// The delay between lock monitor retries.
|
|
// While the session has a 15s TTL plus a 5s wait period on a lost
|
|
// lock, if we can't get our lock back in 10+ seconds something is
|
|
// wrong so we're going to drop the session and start over.
|
|
MonitorRetryTime: 2 * time.Second,
|
|
}
|
|
|
|
c.consulLock, err = c.Client.LockOpts(opts)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
lockErr := &statemgr.LockError{}
|
|
|
|
lockCh, err := c.consulLock.Lock(make(chan struct{}))
|
|
if err != nil {
|
|
lockErr.Err = err
|
|
return "", lockErr
|
|
}
|
|
|
|
if lockCh == nil {
|
|
lockInfo, e := c.getLockInfo()
|
|
if e != nil {
|
|
lockErr.Err = e
|
|
return "", lockErr
|
|
}
|
|
|
|
lockErr.Info = lockInfo
|
|
|
|
return "", lockErr
|
|
}
|
|
|
|
c.lockCh = lockCh
|
|
|
|
err = c.putLockInfo(c.info)
|
|
if err != nil {
|
|
if unlockErr := c.unlock(c.info.ID); unlockErr != nil {
|
|
err = multierror.Append(err, unlockErr)
|
|
}
|
|
|
|
return "", err
|
|
}
|
|
|
|
// Start a goroutine to monitor the lock state.
|
|
// If we lose the lock to due communication issues with the consul agent,
|
|
// attempt to immediately reacquire the lock. Put will verify the integrity
|
|
// of the state by using a CAS operation.
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
c.monitorCancel = cancel
|
|
c.monitorWG.Add(1)
|
|
go func() {
|
|
defer c.monitorWG.Done()
|
|
select {
|
|
case <-c.lockCh:
|
|
log.Println("[ERROR] lost consul lock")
|
|
for {
|
|
c.mu.Lock()
|
|
// We lost our lock, so we need to cancel the session too.
|
|
// The CancelFunc is only replaced while holding Client.mu, so
|
|
// this is safe to call here. This will be replaced by the
|
|
// lock() call below.
|
|
c.sessionCancel()
|
|
|
|
c.consulLock = nil
|
|
_, err := c.lock()
|
|
c.mu.Unlock()
|
|
|
|
if err != nil {
|
|
// We failed to get the lock, keep trying as long as
|
|
// terraform is running. There may be changes in progress,
|
|
// so there's no use in aborting. Either we eventually
|
|
// reacquire the lock, or a Put will fail on a CAS.
|
|
log.Printf("[ERROR] could not reacquire lock: %s", err)
|
|
time.Sleep(lockReacquireInterval)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
continue
|
|
}
|
|
|
|
// if the error was nil, the new lock started a new copy of
|
|
// this goroutine.
|
|
return
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}()
|
|
|
|
if testLockHook != nil {
|
|
testLockHook()
|
|
}
|
|
|
|
return c.info.ID, nil
|
|
}
|
|
|
|
// called after a lock is acquired
|
|
var testLockHook func()
|
|
|
|
func (c *RemoteClient) createSession() (string, error) {
|
|
// create the context first. Even if the session creation fails, we assume
|
|
// that the CancelFunc is always callable.
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
c.sessionCancel = cancel
|
|
|
|
session := c.Client.Session()
|
|
se := &consulapi.SessionEntry{
|
|
Name: consulapi.DefaultLockSessionName,
|
|
TTL: lockSessionTTL,
|
|
LockDelay: lockDelay,
|
|
}
|
|
|
|
id, _, err := session.Create(se, nil)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
log.Println("[INFO] created consul lock session", id)
|
|
|
|
// keep the session renewed
|
|
go session.RenewPeriodic(lockSessionTTL, id, nil, ctx.Done())
|
|
|
|
return id, nil
|
|
}
|
|
|
|
func (c *RemoteClient) Unlock(id string) error {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if !c.lockState {
|
|
return nil
|
|
}
|
|
|
|
return c.unlock(id)
|
|
}
|
|
|
|
// the unlock implementation.
|
|
// Only to be called while holding Client.mu
|
|
func (c *RemoteClient) unlock(id string) error {
|
|
// This method can be called in two circumstances:
|
|
// - when the plan apply or destroy operation finishes and the lock needs to be released,
|
|
// the watchdog stopped and the session closed
|
|
// - when the user calls `terraform force-unlock <lock_id>` in which case
|
|
// we only need to release the lock.
|
|
|
|
if c.consulLock == nil || c.lockCh == nil {
|
|
// The user called `terraform force-unlock <lock_id>`, we just destroy
|
|
// the session which will release the lock, clean the KV store and quit.
|
|
|
|
_, err := c.Client.Session().Destroy(id, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// We ignore the errors that may happen during cleanup
|
|
kv := c.Client.KV()
|
|
kv.Delete(c.lockPath()+lockSuffix, nil)
|
|
kv.Delete(c.lockPath()+lockInfoSuffix, nil)
|
|
|
|
return nil
|
|
}
|
|
|
|
// cancel our monitoring goroutine
|
|
c.monitorCancel()
|
|
|
|
defer func() {
|
|
c.consulLock = nil
|
|
|
|
// The consul session is only used for this single lock, so cancel it
|
|
// after we unlock.
|
|
// The session is only created and replaced holding Client.mu, so the
|
|
// CancelFunc must be non-nil.
|
|
c.sessionCancel()
|
|
}()
|
|
|
|
select {
|
|
case <-c.lockCh:
|
|
return lostLockErr
|
|
default:
|
|
}
|
|
|
|
kv := c.Client.KV()
|
|
|
|
var errs error
|
|
|
|
if _, err := kv.Delete(c.lockPath()+lockInfoSuffix, nil); err != nil {
|
|
errs = multierror.Append(errs, err)
|
|
}
|
|
|
|
if err := c.consulLock.Unlock(); err != nil {
|
|
errs = multierror.Append(errs, err)
|
|
}
|
|
|
|
// the monitoring goroutine may be in a select on the lockCh, so we need to
|
|
// wait for it to return before changing the value.
|
|
c.monitorWG.Wait()
|
|
c.lockCh = nil
|
|
|
|
// This is only cleanup, and will fail if the lock was immediately taken by
|
|
// another client, so we don't report an error to the user here.
|
|
c.consulLock.Destroy()
|
|
|
|
return errs
|
|
}
|
|
|
|
func compressState(data []byte) ([]byte, error) {
|
|
b := new(bytes.Buffer)
|
|
gz := gzip.NewWriter(b)
|
|
if _, err := gz.Write(data); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := gz.Flush(); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := gz.Close(); err != nil {
|
|
return nil, err
|
|
}
|
|
return b.Bytes(), nil
|
|
}
|
|
|
|
func uncompressState(data []byte) ([]byte, error) {
|
|
b := new(bytes.Buffer)
|
|
gz, err := gzip.NewReader(bytes.NewReader(data))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
b.ReadFrom(gz)
|
|
if err := gz.Close(); err != nil {
|
|
return nil, err
|
|
}
|
|
return b.Bytes(), nil
|
|
}
|
|
|
|
func split(payload []byte, limit int) [][]byte {
|
|
var chunk []byte
|
|
chunks := make([][]byte, 0, len(payload)/limit+1)
|
|
for len(payload) >= limit {
|
|
chunk, payload = payload[:limit], payload[limit:]
|
|
chunks = append(chunks, chunk)
|
|
}
|
|
if len(payload) > 0 {
|
|
chunks = append(chunks, payload[:])
|
|
}
|
|
return chunks
|
|
}
|
|
|
|
func (c *RemoteClient) chunkedMode() (bool, string, []string, *consulapi.KVPair, error) {
|
|
kv := c.Client.KV()
|
|
pair, _, err := kv.Get(c.Path, nil)
|
|
if err != nil {
|
|
return false, "", nil, pair, err
|
|
}
|
|
if pair != nil {
|
|
var d map[string]interface{}
|
|
err = json.Unmarshal(pair.Value, &d)
|
|
// If there is an error when unmarshaling the payload, the state has
|
|
// probably been gziped in single entry mode.
|
|
if err == nil {
|
|
// If we find the "current-hash" key we were in chunked mode
|
|
hash, ok := d["current-hash"]
|
|
if ok {
|
|
chunks := make([]string, 0)
|
|
for _, c := range d["chunks"].([]interface{}) {
|
|
chunks = append(chunks, c.(string))
|
|
}
|
|
return true, hash.(string), chunks, pair, nil
|
|
}
|
|
}
|
|
}
|
|
return false, "", nil, pair, nil
|
|
}
|