opentofu/internal/dag/walk.go

452 lines
12 KiB
Go
Raw Normal View History

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
2017-02-02 13:41:46 -06:00
package dag
import (
2017-02-03 04:04:39 -06:00
"errors"
2017-02-02 13:41:46 -06:00
"log"
"sync"
"time"
"github.com/hashicorp/terraform/internal/tfdiags"
2017-02-02 13:41:46 -06:00
)
// Walker is used to walk every vertex of a graph in parallel.
//
// A vertex will only be walked when the dependencies of that vertex have
// been walked. If two vertices can be walked at the same time, they will be.
//
// Update can be called to update the graph. This can be called even during
2019-10-22 04:40:50 -05:00
// a walk, changing vertices/edges mid-walk. This should be done carefully.
// If a vertex is removed but has already been executed, the result of that
// execution (any error) is still returned by Wait. Changing or re-adding
// a vertex that has already executed has no effect. Changing edges of
// a vertex that has already executed has no effect.
//
// Non-parallelism can be enforced by introducing a lock in your callback
// function. However, the goroutine overhead of a walk will remain.
// Walker will create V*2 goroutines (one for each vertex, and dependency
// waiter for each vertex). In general this should be of no concern unless
// there are a huge number of vertices.
2017-02-02 16:43:49 -06:00
//
2017-02-03 04:04:39 -06:00
// The walk is depth first by default. This can be changed with the Reverse
// option.
//
2017-02-02 16:43:49 -06:00
// A single walker is only valid for one graph walk. After the walk is complete
// you must construct a new walker to walk again. State for the walk is never
// deleted in case vertices or edges are changed.
type Walker struct {
2017-02-02 16:43:49 -06:00
// Callback is what is called for each vertex
2017-02-02 13:41:46 -06:00
Callback WalkFunc
2017-02-03 04:04:39 -06:00
// Reverse, if true, causes the source of an edge to depend on a target.
// When false (default), the target depends on the source.
Reverse bool
2017-02-02 16:43:49 -06:00
// changeLock must be held to modify any of the fields below. Only Update
// should modify these fields. Modifying them outside of Update can cause
// serious problems.
2017-02-02 13:41:46 -06:00
changeLock sync.Mutex
2017-02-02 16:43:49 -06:00
vertices Set
edges Set
vertexMap map[Vertex]*walkerVertex
2017-02-02 13:41:46 -06:00
2017-02-02 16:43:49 -06:00
// wait is done when all vertices have executed. It may become "undone"
// if new vertices are added.
wait sync.WaitGroup
// diagsMap contains the diagnostics recorded so far for execution,
// and upstreamFailed contains all the vertices whose problems were
// caused by upstream failures, and thus whose diagnostics should be
// excluded from the final set.
//
// Readers and writers of either map must hold diagsLock.
diagsMap map[Vertex]tfdiags.Diagnostics
upstreamFailed map[Vertex]struct{}
diagsLock sync.Mutex
2017-02-02 13:41:46 -06:00
}
func (w *Walker) init() {
if w.vertices == nil {
w.vertices = make(Set)
}
if w.edges == nil {
w.edges = make(Set)
}
}
2017-02-02 13:41:46 -06:00
type walkerVertex struct {
// These should only be set once on initialization and never written again.
// They are not protected by a lock since they don't need to be since
// they are write-once.
// DoneCh is closed when this vertex has completed execution, regardless
// of success.
//
// CancelCh is closed when the vertex should cancel execution. If execution
// is already complete (DoneCh is closed), this has no effect. Otherwise,
// execution is cancelled as quickly as possible.
DoneCh chan struct{}
CancelCh chan struct{}
2017-02-02 13:41:46 -06:00
// Dependency information. Any changes to any of these fields requires
// holding DepsLock.
//
// DepsCh is sent a single value that denotes whether the upstream deps
// were successful (no errors). Any value sent means that the upstream
// dependencies are complete. No other values will ever be sent again.
//
// DepsUpdateCh is closed when there is a new DepsCh set.
2017-02-03 04:04:39 -06:00
DepsCh chan bool
DepsUpdateCh chan struct{}
DepsLock sync.Mutex
2017-02-02 13:41:46 -06:00
// Below is not safe to read/write in parallel. This behavior is
// enforced by changes only happening in Update. Nothing else should
// ever modify these.
2017-02-02 13:41:46 -06:00
deps map[Vertex]chan struct{}
depsCancelCh chan struct{}
}
// Wait waits for the completion of the walk and returns diagnostics describing
// any problems that arose. Update should be called to populate the walk with
// vertices and edges prior to calling this.
2017-02-02 16:43:49 -06:00
//
// Wait will return as soon as all currently known vertices are complete.
// If you plan on calling Update with more vertices in the future, you
// should not call Wait until after this is done.
func (w *Walker) Wait() tfdiags.Diagnostics {
2017-02-02 13:41:46 -06:00
// Wait for completion
w.wait.Wait()
var diags tfdiags.Diagnostics
w.diagsLock.Lock()
for v, vDiags := range w.diagsMap {
if _, upstream := w.upstreamFailed[v]; upstream {
// Ignore diagnostics for nodes that had failed upstreams, since
// the downstream diagnostics are likely to be redundant.
continue
2017-02-03 04:04:39 -06:00
}
diags = diags.Append(vDiags)
2017-02-02 13:41:46 -06:00
}
w.diagsLock.Unlock()
2017-02-02 13:41:46 -06:00
return diags
2017-02-02 13:41:46 -06:00
}
// Update updates the currently executing walk with the given graph.
// This will perform a diff of the vertices and edges and update the walker.
// Already completed vertices remain completed (including any errors during
// their execution).
2017-02-02 13:41:46 -06:00
//
// This returns immediately once the walker is updated; it does not wait
// for completion of the walk.
//
// Multiple Updates can be called in parallel. Update can be called at any
// time during a walk.
2017-02-03 04:48:09 -06:00
func (w *Walker) Update(g *AcyclicGraph) {
w.init()
v := make(Set)
e := make(Set)
if g != nil {
v, e = g.vertices, g.edges
}
2017-02-02 13:41:46 -06:00
// Grab the change lock so no more updates happen but also so that
// no new vertices are executed during this time since we may be
// removing them.
w.changeLock.Lock()
defer w.changeLock.Unlock()
// Initialize fields
if w.vertexMap == nil {
w.vertexMap = make(map[Vertex]*walkerVertex)
}
// Calculate all our sets
newEdges := e.Difference(w.edges)
2017-02-02 16:38:47 -06:00
oldEdges := w.edges.Difference(e)
newVerts := v.Difference(w.vertices)
2017-02-02 13:41:46 -06:00
oldVerts := w.vertices.Difference(v)
// Add the new vertices
for _, raw := range newVerts {
2017-02-02 13:41:46 -06:00
v := raw.(Vertex)
// Add to the waitgroup so our walk is not done until everything finishes
w.wait.Add(1)
// Add to our own set so we know about it already
w.vertices.Add(raw)
2017-02-02 13:41:46 -06:00
// Initialize the vertex info
info := &walkerVertex{
DoneCh: make(chan struct{}),
CancelCh: make(chan struct{}),
deps: make(map[Vertex]chan struct{}),
2017-02-02 13:41:46 -06:00
}
// Add it to the map and kick off the walk
w.vertexMap[v] = info
}
// Remove the old vertices
for _, raw := range oldVerts {
2017-02-02 13:41:46 -06:00
v := raw.(Vertex)
// Get the vertex info so we can cancel it
info, ok := w.vertexMap[v]
if !ok {
// This vertex for some reason was never in our map. This
// shouldn't be possible.
continue
}
// Cancel the vertex
close(info.CancelCh)
// Delete it out of the map
delete(w.vertexMap, v)
w.vertices.Delete(raw)
2017-02-02 13:41:46 -06:00
}
// Add the new edges
changedDeps := make(Set)
for _, raw := range newEdges {
2017-02-02 13:41:46 -06:00
edge := raw.(Edge)
2017-02-03 04:04:39 -06:00
waiter, dep := w.edgeParts(edge)
2017-02-02 13:41:46 -06:00
// Get the info for the waiter
waiterInfo, ok := w.vertexMap[waiter]
if !ok {
// Vertex doesn't exist... shouldn't be possible but ignore.
continue
}
// Get the info for the dep
depInfo, ok := w.vertexMap[dep]
if !ok {
// Vertex doesn't exist... shouldn't be possible but ignore.
continue
}
// Add the dependency to our waiter
waiterInfo.deps[dep] = depInfo.DoneCh
// Record that the deps changed for this waiter
changedDeps.Add(waiter)
2017-02-02 16:38:47 -06:00
w.edges.Add(raw)
}
// Process removed edges
for _, raw := range oldEdges {
2017-02-02 16:38:47 -06:00
edge := raw.(Edge)
2017-02-03 04:04:39 -06:00
waiter, dep := w.edgeParts(edge)
2017-02-02 16:38:47 -06:00
// Get the info for the waiter
waiterInfo, ok := w.vertexMap[waiter]
if !ok {
// Vertex doesn't exist... shouldn't be possible but ignore.
continue
}
// Delete the dependency from the waiter
delete(waiterInfo.deps, dep)
// Record that the deps changed for this waiter
changedDeps.Add(waiter)
w.edges.Delete(raw)
2017-02-02 13:41:46 -06:00
}
// For each vertex with changed dependencies, we need to kick off
// a new waiter and notify the vertex of the changes.
for _, raw := range changedDeps {
2017-02-02 13:41:46 -06:00
v := raw.(Vertex)
info, ok := w.vertexMap[v]
if !ok {
// Vertex doesn't exist... shouldn't be possible but ignore.
continue
}
// Create a new done channel
2017-02-03 04:04:39 -06:00
doneCh := make(chan bool, 1)
2017-02-02 13:41:46 -06:00
// Create the channel we close for cancellation
cancelCh := make(chan struct{})
// Build a new deps copy
deps := make(map[Vertex]<-chan struct{})
for k, v := range info.deps {
deps[k] = v
}
// Update the update channel
info.DepsLock.Lock()
if info.DepsUpdateCh != nil {
close(info.DepsUpdateCh)
}
info.DepsCh = doneCh
info.DepsUpdateCh = make(chan struct{})
info.DepsLock.Unlock()
// Cancel the older waiter
if info.depsCancelCh != nil {
close(info.depsCancelCh)
}
info.depsCancelCh = cancelCh
2017-02-02 13:41:46 -06:00
// Start the waiter
go w.waitDeps(v, deps, doneCh, cancelCh)
}
// Start all the new vertices. We do this at the end so that all
// the edge waiters and changes are set up above.
for _, raw := range newVerts {
2017-02-02 13:41:46 -06:00
v := raw.(Vertex)
go w.walkVertex(v, w.vertexMap[v])
}
}
2017-02-03 04:04:39 -06:00
// edgeParts returns the waiter and the dependency, in that order.
// The waiter is waiting on the dependency.
func (w *Walker) edgeParts(e Edge) (Vertex, Vertex) {
2017-02-03 04:04:39 -06:00
if w.Reverse {
return e.Source(), e.Target()
}
return e.Target(), e.Source()
}
2017-02-02 13:41:46 -06:00
// walkVertex walks a single vertex, waiting for any dependencies before
// executing the callback.
func (w *Walker) walkVertex(v Vertex, info *walkerVertex) {
2017-02-02 13:41:46 -06:00
// When we're done executing, lower the waitgroup count
defer w.wait.Done()
// When we're done, always close our done channel
defer close(info.DoneCh)
2017-02-03 04:04:39 -06:00
// Wait for our dependencies. We create a [closed] deps channel so
// that we can immediately fall through to load our actual DepsCh.
var depsSuccess bool
var depsUpdateCh chan struct{}
2017-02-03 04:04:39 -06:00
depsCh := make(chan bool, 1)
depsCh <- true
close(depsCh)
2017-02-02 13:41:46 -06:00
for {
select {
case <-info.CancelCh:
// Cancel
return
2017-02-03 04:04:39 -06:00
case depsSuccess = <-depsCh:
// Deps complete! Mark as nil to trigger completion handling.
2017-02-02 13:41:46 -06:00
depsCh = nil
case <-depsUpdateCh:
2017-02-02 13:41:46 -06:00
// New deps, reloop
}
2017-02-02 16:43:49 -06:00
// Check if we have updated dependencies. This can happen if the
2017-04-26 09:10:04 -05:00
// dependencies were satisfied exactly prior to an Update occurring.
2017-02-02 16:43:49 -06:00
// In that case, we'd like to take into account new dependencies
// if possible.
info.DepsLock.Lock()
if info.DepsCh != nil {
depsCh = info.DepsCh
info.DepsCh = nil
}
if info.DepsUpdateCh != nil {
depsUpdateCh = info.DepsUpdateCh
}
info.DepsLock.Unlock()
2017-02-02 13:41:46 -06:00
// If we still have no deps channel set, then we're done!
if depsCh == nil {
break
2017-02-02 13:41:46 -06:00
}
}
2017-02-03 04:04:39 -06:00
// If we passed dependencies, we just want to check once more that
// we're not cancelled, since this can happen just as dependencies pass.
select {
case <-info.CancelCh:
// Cancelled during an update while dependencies completed.
return
default:
}
// Run our callback or note that our upstream failed
var diags tfdiags.Diagnostics
var upstreamFailed bool
2017-02-03 04:04:39 -06:00
if depsSuccess {
diags = w.Callback(v)
2017-02-03 04:04:39 -06:00
} else {
terraform: ugly huge change to weave in new HCL2-oriented types Due to how deeply the configuration types go into Terraform Core, there isn't a great way to switch out to HCL2 gradually. As a consequence, this huge commit gets us from the old state to a _compilable_ new state, but does not yet attempt to fix any tests and has a number of known missing parts and bugs. We will continue to iterate on this in forthcoming commits, heading back towards passing tests and making Terraform fully-functional again. The three main goals here are: - Use the configuration models from the "configs" package instead of the older models in the "config" package, which is now deprecated and preserved only to help us write our migration tool. - Do expression inspection and evaluation using the functionality of the new "lang" package, instead of the Interpolator type and related functionality in the main "terraform" package. - Represent addresses of various objects using types in the addrs package, rather than hand-constructed strings. This is not critical to support the above, but was a big help during the implementation of these other points since it made it much more explicit what kind of address is expected in each context. Since our new packages are built to accommodate some future planned features that are not yet implemented (e.g. the "for_each" argument on resources, "count"/"for_each" on modules), and since there's still a fair amount of functionality still using old-style APIs, there is a moderate amount of shimming here to connect new assumptions with old, hopefully in a way that makes it easier to find and eliminate these shims later. I apologize in advance to the person who inevitably just found this huge commit while spelunking through the commit history.
2018-04-30 12:33:53 -05:00
log.Printf("[TRACE] dag/walk: upstream of %q errored, so skipping", VertexName(v))
// This won't be displayed to the user because we'll set upstreamFailed,
// but we need to ensure there's at least one error in here so that
// the failures will cascade downstream.
diags = diags.Append(errors.New("upstream dependencies failed"))
upstreamFailed = true
2017-02-03 04:04:39 -06:00
}
// Record the result (we must do this after execution because we mustn't
// hold diagsLock while visiting a vertex.)
w.diagsLock.Lock()
if w.diagsMap == nil {
w.diagsMap = make(map[Vertex]tfdiags.Diagnostics)
}
w.diagsMap[v] = diags
if w.upstreamFailed == nil {
w.upstreamFailed = make(map[Vertex]struct{})
2017-02-02 13:41:46 -06:00
}
if upstreamFailed {
w.upstreamFailed[v] = struct{}{}
}
w.diagsLock.Unlock()
2017-02-02 13:41:46 -06:00
}
func (w *Walker) waitDeps(
2017-02-02 13:41:46 -06:00
v Vertex,
deps map[Vertex]<-chan struct{},
2017-02-03 04:04:39 -06:00
doneCh chan<- bool,
2017-02-02 13:41:46 -06:00
cancelCh <-chan struct{}) {
2017-02-02 13:41:46 -06:00
// For each dependency given to us, wait for it to complete
for dep, depCh := range deps {
DepSatisfied:
for {
select {
case <-depCh:
// Dependency satisfied!
break DepSatisfied
case <-cancelCh:
2017-02-03 04:04:39 -06:00
// Wait cancelled. Note that we didn't satisfy dependencies
// so that anything waiting on us also doesn't run.
doneCh <- false
2017-02-02 13:41:46 -06:00
return
case <-time.After(time.Second * 5):
terraform: ugly huge change to weave in new HCL2-oriented types Due to how deeply the configuration types go into Terraform Core, there isn't a great way to switch out to HCL2 gradually. As a consequence, this huge commit gets us from the old state to a _compilable_ new state, but does not yet attempt to fix any tests and has a number of known missing parts and bugs. We will continue to iterate on this in forthcoming commits, heading back towards passing tests and making Terraform fully-functional again. The three main goals here are: - Use the configuration models from the "configs" package instead of the older models in the "config" package, which is now deprecated and preserved only to help us write our migration tool. - Do expression inspection and evaluation using the functionality of the new "lang" package, instead of the Interpolator type and related functionality in the main "terraform" package. - Represent addresses of various objects using types in the addrs package, rather than hand-constructed strings. This is not critical to support the above, but was a big help during the implementation of these other points since it made it much more explicit what kind of address is expected in each context. Since our new packages are built to accommodate some future planned features that are not yet implemented (e.g. the "for_each" argument on resources, "count"/"for_each" on modules), and since there's still a fair amount of functionality still using old-style APIs, there is a moderate amount of shimming here to connect new assumptions with old, hopefully in a way that makes it easier to find and eliminate these shims later. I apologize in advance to the person who inevitably just found this huge commit while spelunking through the commit history.
2018-04-30 12:33:53 -05:00
log.Printf("[TRACE] dag/walk: vertex %q is waiting for %q",
2017-02-02 13:41:46 -06:00
VertexName(v), VertexName(dep))
}
}
}
2017-02-03 04:04:39 -06:00
// Dependencies satisfied! We need to check if any errored
w.diagsLock.Lock()
defer w.diagsLock.Unlock()
for dep := range deps {
if w.diagsMap[dep].HasErrors() {
2017-02-03 04:04:39 -06:00
// One of our dependencies failed, so return false
doneCh <- false
return
}
}
// All dependencies satisfied and successful
doneCh <- true
2017-02-02 13:41:46 -06:00
}