opentofu/helper/resource/state.go

192 lines
5.2 KiB
Go

package resource
import (
"log"
"sync/atomic"
"time"
)
// StateRefreshFunc is a function type used for StateChangeConf that is
// responsible for refreshing the item being watched for a state change.
//
// It returns three results. `result` is any object that will be returned
// as the final object after waiting for state change. This allows you to
// return the final updated object, for example an EC2 instance after refreshing
// it.
//
// `state` is the latest state of that object. And `err` is any error that
// may have happened while refreshing the state.
type StateRefreshFunc func() (result interface{}, state string, err error)
// StateChangeConf is the configuration struct used for `WaitForState`.
type StateChangeConf struct {
Delay time.Duration // Wait this time before starting checks
Pending []string // States that are "allowed" and will continue trying
Refresh StateRefreshFunc // Refreshes the current state
Target []string // Target state
Timeout time.Duration // The amount of time to wait before timeout
MinTimeout time.Duration // Smallest time to wait before refreshes
PollInterval time.Duration // Override MinTimeout/backoff and only poll this often
NotFoundChecks int // Number of times to allow not found
// This is to work around inconsistent APIs
ContinuousTargetOccurence int // Number of times the Target state has to occur continuously
}
// WaitForState watches an object and waits for it to achieve the state
// specified in the configuration using the specified Refresh() func,
// waiting the number of seconds specified in the timeout configuration.
//
// If the Refresh function returns a error, exit immediately with that error.
//
// If the Refresh function returns a state other than the Target state or one
// listed in Pending, return immediately with an error.
//
// If the Timeout is exceeded before reaching the Target state, return an
// error.
//
// Otherwise, result the result of the first call to the Refresh function to
// reach the target state.
func (conf *StateChangeConf) WaitForState() (interface{}, error) {
log.Printf("[DEBUG] Waiting for state to become: %s", conf.Target)
notfoundTick := 0
targetOccurence := 0
// Set a default for times to check for not found
if conf.NotFoundChecks == 0 {
conf.NotFoundChecks = 20
}
if conf.ContinuousTargetOccurence == 0 {
conf.ContinuousTargetOccurence = 1
}
// We can't safely read the result values if we timeout, so store them in
// an atomic.Value
type Result struct {
Result interface{}
State string
Error error
}
var lastResult atomic.Value
lastResult.Store(Result{})
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
// Wait for the delay
time.Sleep(conf.Delay)
wait := 100 * time.Millisecond
for {
res, currentState, err := conf.Refresh()
result := Result{
Result: res,
State: currentState,
Error: err,
}
lastResult.Store(result)
if err != nil {
return
}
// If we're waiting for the absence of a thing, then return
if res == nil && len(conf.Target) == 0 {
targetOccurence += 1
if conf.ContinuousTargetOccurence == targetOccurence {
return
} else {
continue
}
}
if res == nil {
// If we didn't find the resource, check if we have been
// not finding it for awhile, and if so, report an error.
notfoundTick += 1
if notfoundTick > conf.NotFoundChecks {
result.Error = &NotFoundError{
LastError: err,
Retries: notfoundTick,
}
lastResult.Store(result)
return
}
} else {
// Reset the counter for when a resource isn't found
notfoundTick = 0
found := false
for _, allowed := range conf.Target {
if currentState == allowed {
found = true
targetOccurence += 1
if conf.ContinuousTargetOccurence == targetOccurence {
return
} else {
continue
}
}
}
for _, allowed := range conf.Pending {
if currentState == allowed {
found = true
targetOccurence = 0
break
}
}
if !found {
result.Error = &UnexpectedStateError{
LastError: err,
State: result.State,
ExpectedState: conf.Target,
}
lastResult.Store(result)
return
}
}
// If a poll interval has been specified, choose that interval.
// Otherwise bound the default value.
if conf.PollInterval > 0 && conf.PollInterval < 180*time.Second {
wait = conf.PollInterval
} else {
if wait < conf.MinTimeout {
wait = conf.MinTimeout
} else if wait > 10*time.Second {
wait = 10 * time.Second
}
}
log.Printf("[TRACE] Waiting %s before next try", wait)
time.Sleep(wait)
// Wait between refreshes using exponential backoff, except when
// waiting for the target state to reoccur.
if targetOccurence == 0 {
wait *= 2
}
}
}()
select {
case <-doneCh:
r := lastResult.Load().(Result)
return r.Result, r.Error
case <-time.After(conf.Timeout):
r := lastResult.Load().(Result)
return nil, &TimeoutError{
LastError: r.Error,
LastState: r.State,
Timeout: conf.Timeout,
ExpectedState: conf.Target,
}
}
}