mirror of
https://github.com/opentofu/opentofu.git
synced 2025-01-08 07:03:16 -06:00
Merge pull request #14834 from hashicorp/jbardin/state-hook
Persist state more frequently
This commit is contained in:
commit
1d585762dd
@ -126,6 +126,17 @@ func (b *Local) opApply(
|
||||
b.CLI.Output("stopping apply operation...")
|
||||
}
|
||||
|
||||
// try to force a PersistState just in case the process is terminated
|
||||
// before we can complete.
|
||||
if err := opState.PersistState(); err != nil {
|
||||
// We can't error out from here, but warn the user if there was an error.
|
||||
// If this isn't transient, we will catch it again below, and
|
||||
// attempt to save the state another way.
|
||||
if b.CLI != nil {
|
||||
b.CLI.Error(fmt.Sprintf(earlyStateWriteErrorFmt, err))
|
||||
}
|
||||
}
|
||||
|
||||
// Stop execution
|
||||
go tfCtx.Stop()
|
||||
|
||||
@ -270,3 +281,10 @@ importing each resource using its id from the target system.
|
||||
|
||||
This is a serious bug in Terraform and should be reported.
|
||||
`
|
||||
|
||||
const earlyStateWriteErrorFmt = `Error saving current state: %s
|
||||
|
||||
Terraform encountered an error attempting to save the state before canceling
|
||||
the current operation. Once the operation is complete another attempt will be
|
||||
made to save the final state.
|
||||
`
|
||||
|
@ -2,17 +2,27 @@ package local
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/terraform/state"
|
||||
"github.com/hashicorp/terraform/terraform"
|
||||
)
|
||||
|
||||
// interval between forced PersistState calls by StateHook
|
||||
const persistStateHookInterval = 10 * time.Second
|
||||
|
||||
// StateHook is a hook that continuously updates the state by calling
|
||||
// WriteState on a state.State.
|
||||
type StateHook struct {
|
||||
terraform.NilHook
|
||||
sync.Mutex
|
||||
|
||||
// lastPersist is the time of the last call to PersistState, for periodic
|
||||
// updates to remote state. PostStateUpdate will force a call PersistState
|
||||
// if it has been more that persistStateHookInterval since the last call to
|
||||
// PersistState.
|
||||
lastPersist time.Time
|
||||
|
||||
State state.State
|
||||
}
|
||||
|
||||
@ -26,8 +36,24 @@ func (h *StateHook) PostStateUpdate(
|
||||
if err := h.State.WriteState(s); err != nil {
|
||||
return terraform.HookActionHalt, err
|
||||
}
|
||||
|
||||
// periodically persist the state
|
||||
if time.Since(h.lastPersist) > persistStateHookInterval {
|
||||
if err := h.persistState(); err != nil {
|
||||
return terraform.HookActionHalt, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Continue forth
|
||||
return terraform.HookActionContinue, nil
|
||||
}
|
||||
|
||||
func (h *StateHook) persistState() error {
|
||||
if h.State != nil {
|
||||
err := h.State.PersistState()
|
||||
h.lastPersist = time.Now()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -1,7 +1,9 @@
|
||||
package local
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/terraform/state"
|
||||
"github.com/hashicorp/terraform/terraform"
|
||||
@ -27,3 +29,98 @@ func TestStateHook(t *testing.T) {
|
||||
t.Fatalf("bad state: %#v", is.State())
|
||||
}
|
||||
}
|
||||
|
||||
// testPersistState stores the state on WriteState, and
|
||||
type testPersistState struct {
|
||||
*state.InmemState
|
||||
|
||||
mu sync.Mutex
|
||||
persisted bool
|
||||
}
|
||||
|
||||
func (s *testPersistState) WriteState(state *terraform.State) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.persisted = false
|
||||
return s.InmemState.WriteState(state)
|
||||
}
|
||||
|
||||
func (s *testPersistState) PersistState() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.persisted = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// verify that StateHook calls PersistState if the last call was more than
|
||||
// persistStateHookInterval
|
||||
func TestStateHookPersist(t *testing.T) {
|
||||
is := &testPersistState{
|
||||
InmemState: &state.InmemState{},
|
||||
}
|
||||
hook := &StateHook{State: is}
|
||||
|
||||
s := state.TestStateInitial()
|
||||
hook.PostStateUpdate(s)
|
||||
|
||||
// the first call should persist, since the last time was zero
|
||||
if !is.persisted {
|
||||
t.Fatal("PersistState not called")
|
||||
}
|
||||
|
||||
s.Serial++
|
||||
hook.PostStateUpdate(s)
|
||||
|
||||
// this call should not have persisted
|
||||
if is.persisted {
|
||||
t.Fatal("PostStateUpdate called PersistState early")
|
||||
}
|
||||
|
||||
if !is.State().Equal(s) {
|
||||
t.Fatalf("bad state: %#v", is.State())
|
||||
}
|
||||
|
||||
// set the last call back to before our interval
|
||||
hook.lastPersist = time.Now().Add(-2 * persistStateHookInterval)
|
||||
|
||||
s.Serial++
|
||||
hook.PostStateUpdate(s)
|
||||
|
||||
if !is.persisted {
|
||||
t.Fatal("PersistState not called")
|
||||
}
|
||||
|
||||
if !is.State().Equal(s) {
|
||||
t.Fatalf("bad state: %#v", is.State())
|
||||
}
|
||||
}
|
||||
|
||||
// verify that the satet hook is safe for concurrent use
|
||||
func TestStateHookRace(t *testing.T) {
|
||||
is := &state.InmemState{}
|
||||
var hook terraform.Hook = &StateHook{State: is}
|
||||
|
||||
s := state.TestStateInitial()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
action, err := hook.PostStateUpdate(s)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if action != terraform.HookActionContinue {
|
||||
t.Fatalf("bad: %v", action)
|
||||
}
|
||||
if !is.State().Equal(s) {
|
||||
t.Fatalf("bad state: %#v", is.State())
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
@ -1,33 +0,0 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/terraform/state"
|
||||
"github.com/hashicorp/terraform/terraform"
|
||||
)
|
||||
|
||||
// StateHook is a hook that continuously updates the state by calling
|
||||
// WriteState on a state.State.
|
||||
type StateHook struct {
|
||||
terraform.NilHook
|
||||
sync.Mutex
|
||||
|
||||
State state.State
|
||||
}
|
||||
|
||||
func (h *StateHook) PostStateUpdate(
|
||||
s *terraform.State) (terraform.HookAction, error) {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
if h.State != nil {
|
||||
// Write the new state
|
||||
if err := h.State.WriteState(s); err != nil {
|
||||
return terraform.HookActionHalt, err
|
||||
}
|
||||
}
|
||||
|
||||
// Continue forth
|
||||
return terraform.HookActionContinue, nil
|
||||
}
|
@ -1,29 +0,0 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/terraform/state"
|
||||
"github.com/hashicorp/terraform/terraform"
|
||||
)
|
||||
|
||||
func TestStateHook_impl(t *testing.T) {
|
||||
var _ terraform.Hook = new(StateHook)
|
||||
}
|
||||
|
||||
func TestStateHook(t *testing.T) {
|
||||
is := &state.InmemState{}
|
||||
var hook terraform.Hook = &StateHook{State: is}
|
||||
|
||||
s := state.TestStateInitial()
|
||||
action, err := hook.PostStateUpdate(s)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if action != terraform.HookActionContinue {
|
||||
t.Fatalf("bad: %v", action)
|
||||
}
|
||||
if !is.State().Equal(s) {
|
||||
t.Fatalf("bad state: %#v", is.State())
|
||||
}
|
||||
}
|
@ -1,12 +1,17 @@
|
||||
package state
|
||||
|
||||
import "github.com/hashicorp/terraform/terraform"
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/terraform/terraform"
|
||||
)
|
||||
|
||||
// BackupState wraps a State that backs up the state on the first time that
|
||||
// a WriteState or PersistState is called.
|
||||
//
|
||||
// If Path exists, it will be overwritten.
|
||||
type BackupState struct {
|
||||
mu sync.Mutex
|
||||
Real State
|
||||
Path string
|
||||
|
||||
@ -22,6 +27,9 @@ func (s *BackupState) RefreshState() error {
|
||||
}
|
||||
|
||||
func (s *BackupState) WriteState(state *terraform.State) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if !s.done {
|
||||
if err := s.backup(); err != nil {
|
||||
return err
|
||||
@ -32,6 +40,9 @@ func (s *BackupState) WriteState(state *terraform.State) error {
|
||||
}
|
||||
|
||||
func (s *BackupState) PersistState() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if !s.done {
|
||||
if err := s.backup(); err != nil {
|
||||
return err
|
||||
|
@ -3,6 +3,7 @@ package state
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
@ -31,3 +32,34 @@ func TestBackupState(t *testing.T) {
|
||||
t.Fatalf("bad: %d", fi.Size())
|
||||
}
|
||||
}
|
||||
|
||||
func TestBackupStateRace(t *testing.T) {
|
||||
f, err := ioutil.TempFile("", "tf")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
f.Close()
|
||||
defer os.Remove(f.Name())
|
||||
|
||||
ls := testLocalState(t)
|
||||
defer os.Remove(ls.Path)
|
||||
bs := &BackupState{
|
||||
Real: ls,
|
||||
Path: f.Name(),
|
||||
}
|
||||
|
||||
current := TestStateInitial()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 100; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
bs.WriteState(current)
|
||||
bs.PersistState()
|
||||
bs.RefreshState()
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
@ -10,18 +10,28 @@ import (
|
||||
|
||||
// InmemState is an in-memory state storage.
|
||||
type InmemState struct {
|
||||
mu sync.Mutex
|
||||
state *terraform.State
|
||||
}
|
||||
|
||||
func (s *InmemState) State() *terraform.State {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.state.DeepCopy()
|
||||
}
|
||||
|
||||
func (s *InmemState) RefreshState() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *InmemState) WriteState(state *terraform.State) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
state.IncrementSerialMaybe(s.state)
|
||||
s.state = state
|
||||
return nil
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
@ -16,6 +17,8 @@ import (
|
||||
|
||||
// LocalState manages a state storage that is local to the filesystem.
|
||||
type LocalState struct {
|
||||
mu sync.Mutex
|
||||
|
||||
// Path is the path to read the state from. PathOut is the path to
|
||||
// write the state to. If PathOut is not specified, Path will be used.
|
||||
// If PathOut already exists, it will be overwritten.
|
||||
@ -42,6 +45,9 @@ type LocalState struct {
|
||||
|
||||
// SetState will force a specific state in-memory for this local state.
|
||||
func (s *LocalState) SetState(state *terraform.State) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.state = state
|
||||
s.readState = state
|
||||
}
|
||||
@ -58,6 +64,9 @@ func (s *LocalState) State() *terraform.State {
|
||||
//
|
||||
// StateWriter impl.
|
||||
func (s *LocalState) WriteState(state *terraform.State) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.stateFileOut == nil {
|
||||
if err := s.createStateFiles(); err != nil {
|
||||
return nil
|
||||
@ -99,6 +108,9 @@ func (s *LocalState) PersistState() error {
|
||||
|
||||
// StateRefresher impl.
|
||||
func (s *LocalState) RefreshState() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
var reader io.Reader
|
||||
if !s.written {
|
||||
// we haven't written a state file yet, so load from Path
|
||||
@ -141,6 +153,9 @@ func (s *LocalState) RefreshState() error {
|
||||
|
||||
// Lock implements a local filesystem state.Locker.
|
||||
func (s *LocalState) Lock(info *LockInfo) (string, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.stateFileOut == nil {
|
||||
if err := s.createStateFiles(); err != nil {
|
||||
return "", err
|
||||
@ -170,6 +185,9 @@ func (s *LocalState) Lock(info *LockInfo) (string, error) {
|
||||
}
|
||||
|
||||
func (s *LocalState) Unlock(id string) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.lockID == "" {
|
||||
return fmt.Errorf("LocalState not locked")
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"os/exec"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/terraform/terraform"
|
||||
@ -15,6 +16,22 @@ func TestLocalState(t *testing.T) {
|
||||
TestState(t, ls)
|
||||
}
|
||||
|
||||
func TestLocalStateRace(t *testing.T) {
|
||||
ls := testLocalState(t)
|
||||
defer os.Remove(ls.Path)
|
||||
|
||||
current := TestStateInitial()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 100; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
ls.WriteState(current)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func TestLocalStateLocks(t *testing.T) {
|
||||
s := testLocalState(t)
|
||||
defer os.Remove(s.Path)
|
||||
|
@ -2,6 +2,7 @@ package remote
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/terraform/state"
|
||||
"github.com/hashicorp/terraform/terraform"
|
||||
@ -12,6 +13,8 @@ import (
|
||||
// local caching so every persist will go to the remote storage and local
|
||||
// writes will go to memory.
|
||||
type State struct {
|
||||
mu sync.Mutex
|
||||
|
||||
Client Client
|
||||
|
||||
state, readState *terraform.State
|
||||
@ -19,17 +22,26 @@ type State struct {
|
||||
|
||||
// StateReader impl.
|
||||
func (s *State) State() *terraform.State {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.state.DeepCopy()
|
||||
}
|
||||
|
||||
// StateWriter impl.
|
||||
func (s *State) WriteState(state *terraform.State) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.state = state
|
||||
return nil
|
||||
}
|
||||
|
||||
// StateRefresher impl.
|
||||
func (s *State) RefreshState() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
payload, err := s.Client.Get()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -52,6 +64,9 @@ func (s *State) RefreshState() error {
|
||||
|
||||
// StatePersister impl.
|
||||
func (s *State) PersistState() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.state.IncrementSerialMaybe(s.readState)
|
||||
|
||||
var buf bytes.Buffer
|
||||
@ -64,6 +79,9 @@ func (s *State) PersistState() error {
|
||||
|
||||
// Lock calls the Client's Lock method if it's implemented.
|
||||
func (s *State) Lock(info *state.LockInfo) (string, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if c, ok := s.Client.(ClientLocker); ok {
|
||||
return c.Lock(info)
|
||||
}
|
||||
@ -72,6 +90,9 @@ func (s *State) Lock(info *state.LockInfo) (string, error) {
|
||||
|
||||
// Unlock calls the Client's Unlock method if it's implemented.
|
||||
func (s *State) Unlock(id string) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if c, ok := s.Client.(ClientLocker); ok {
|
||||
return c.Unlock(id)
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package remote
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/terraform/state"
|
||||
@ -13,3 +14,24 @@ func TestState_impl(t *testing.T) {
|
||||
var _ state.StateRefresher = new(State)
|
||||
var _ state.Locker = new(State)
|
||||
}
|
||||
|
||||
func TestStateRace(t *testing.T) {
|
||||
s := &State{
|
||||
Client: nilClient{},
|
||||
}
|
||||
|
||||
current := state.TestStateInitial()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
s.WriteState(current)
|
||||
s.PersistState()
|
||||
s.RefreshState()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user