mirror of
https://github.com/opentofu/opentofu.git
synced 2026-02-18 18:17:54 -05:00
engine/ephemeral: Wire together basic ephemeral functionality (#3710)
Signed-off-by: Christian Mesh <christianmesh1@gmail.com>
This commit is contained in:
parent
b19a648984
commit
8ce780b4e0
32 changed files with 676 additions and 1240 deletions
|
|
@ -7,29 +7,78 @@ package applying
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/opentofu/opentofu/internal/engine/internal/exec"
|
||||
"github.com/opentofu/opentofu/internal/lang/eval"
|
||||
"github.com/opentofu/opentofu/internal/shared"
|
||||
"github.com/opentofu/opentofu/internal/states"
|
||||
"github.com/opentofu/opentofu/internal/tfdiags"
|
||||
)
|
||||
|
||||
// EphemeralOpen implements [exec.Operations].
|
||||
func (ops *execOperations) EphemeralOpen(
|
||||
ctx context.Context,
|
||||
desired *eval.DesiredResourceInstance,
|
||||
inst *eval.DesiredResourceInstance,
|
||||
providerClient *exec.ProviderClient,
|
||||
) (*exec.OpenEphemeralResourceInstance, tfdiags.Diagnostics) {
|
||||
log.Printf("[TRACE] apply phase: EphemeralOpen %s using %s", inst.Addr, providerClient.InstanceAddr)
|
||||
|
||||
var diags tfdiags.Diagnostics
|
||||
|
||||
schema, _ := ops.plugins.ResourceTypeSchema(ctx, inst.Provider, inst.Addr.Resource.Resource.Mode, inst.Addr.Resource.Resource.Type)
|
||||
if schema == nil || schema.Block == nil {
|
||||
// Should be caught during validation, so we don't bother with a pretty error here
|
||||
diags = diags.Append(fmt.Errorf("provider %q does not support ephemeral resource %q", inst.ProviderInstance, inst.Addr.Resource.Resource.Type))
|
||||
return nil, diags
|
||||
}
|
||||
|
||||
newVal, closeFunc, openDiags := shared.OpenEphemeralResourceInstance(
|
||||
ctx,
|
||||
inst.Addr,
|
||||
schema.Block,
|
||||
*inst.ProviderInstance,
|
||||
providerClient.Ops,
|
||||
inst.ConfigVal,
|
||||
shared.EphemeralResourceHooks{},
|
||||
)
|
||||
diags = diags.Append(openDiags)
|
||||
if openDiags.HasErrors() {
|
||||
return nil, diags
|
||||
}
|
||||
|
||||
state := &exec.ResourceInstanceObject{
|
||||
InstanceAddr: inst.Addr,
|
||||
State: &states.ResourceInstanceObjectFull{
|
||||
Status: states.ObjectReady,
|
||||
Value: newVal,
|
||||
// TODO Not sure these fields are needed
|
||||
ResourceType: inst.Addr.Resource.Resource.Type,
|
||||
ProviderInstanceAddr: providerClient.InstanceAddr,
|
||||
//SchemaVersion: uint64(schema.Version),
|
||||
//Private: resp.Private,
|
||||
},
|
||||
}
|
||||
|
||||
return &exec.OpenEphemeralResourceInstance{
|
||||
State: state,
|
||||
Close: closeFunc,
|
||||
}, diags
|
||||
}
|
||||
|
||||
// EphemeralState implements [exec.Operations]
|
||||
func (ops *execOperations) EphemeralState(
|
||||
ctx context.Context,
|
||||
ephemeralInst *exec.OpenEphemeralResourceInstance,
|
||||
) (*exec.ResourceInstanceObject, tfdiags.Diagnostics) {
|
||||
log.Printf("[TRACE] apply phase: EphemeralOpen %s using %s", desired.Addr, providerClient.InstanceAddr)
|
||||
panic("unimplemented")
|
||||
return ephemeralInst.State, nil
|
||||
}
|
||||
|
||||
// EphemeralClose implements [exec.Operations].
|
||||
func (ops *execOperations) EphemeralClose(
|
||||
ctx context.Context,
|
||||
object *exec.ResourceInstanceObject,
|
||||
providerClient *exec.ProviderClient,
|
||||
ephemeralInst *exec.OpenEphemeralResourceInstance,
|
||||
) tfdiags.Diagnostics {
|
||||
log.Printf("[TRACE] apply phase: EphemeralClose %s using %s", object.InstanceAddr, providerClient.InstanceAddr)
|
||||
panic("unimplemented")
|
||||
return ephemeralInst.Close(ctx)
|
||||
}
|
||||
|
|
|
|||
17
internal/engine/internal/exec/ephemeral.go
Normal file
17
internal/engine/internal/exec/ephemeral.go
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
// Copyright (c) The OpenTofu Authors
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
// Copyright (c) 2023 HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package exec
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/opentofu/opentofu/internal/tfdiags"
|
||||
)
|
||||
|
||||
type OpenEphemeralResourceInstance struct {
|
||||
State *ResourceInstanceObject
|
||||
Close func(context.Context) tfdiags.Diagnostics
|
||||
}
|
||||
|
|
@ -271,19 +271,25 @@ type Operations interface {
|
|||
// detail of whatever is managing a provider's operation, with the execution
|
||||
// graph just assuming that ephemeral objects remain valid _somehow_ for
|
||||
// the full duration of their use.
|
||||
EphemeralOpen(
|
||||
ctx context.Context,
|
||||
desired *eval.DesiredResourceInstance,
|
||||
providerClient *ProviderClient,
|
||||
) (*OpenEphemeralResourceInstance, tfdiags.Diagnostics)
|
||||
|
||||
// EphemeralState refines the open ephemeral resource instance into the
|
||||
// required resource object state
|
||||
//
|
||||
// Execution graph processing automatically passes the result of this
|
||||
// function to [Operations.ResourceInstancePostconditions] when appropriate,
|
||||
// propagating any additional diagnostics it returns, and so implementers of
|
||||
// this method should not attempt to handle postconditions themselves.
|
||||
EphemeralOpen(
|
||||
EphemeralState(
|
||||
ctx context.Context,
|
||||
desired *eval.DesiredResourceInstance,
|
||||
providerClient *ProviderClient,
|
||||
ephemeral *OpenEphemeralResourceInstance,
|
||||
) (*ResourceInstanceObject, tfdiags.Diagnostics)
|
||||
|
||||
// EphemeralClose uses the given provider client to "close" the given
|
||||
// ephemeral object.
|
||||
// EphemeralClose calls Close on the open ephemeral resource instance
|
||||
//
|
||||
// A valid execution graph ensures that this is called only after all other
|
||||
// operations using the given object have either completed or have been
|
||||
|
|
@ -292,7 +298,6 @@ type Operations interface {
|
|||
// after this method returns.
|
||||
EphemeralClose(
|
||||
ctx context.Context,
|
||||
object *ResourceInstanceObject,
|
||||
providerClient *ProviderClient,
|
||||
ephemeral *OpenEphemeralResourceInstance,
|
||||
) tfdiags.Diagnostics
|
||||
}
|
||||
|
|
|
|||
|
|
@ -214,22 +214,30 @@ func (b *Builder) DataRead(
|
|||
func (b *Builder) EphemeralOpen(
|
||||
desiredInst ResultRef[*eval.DesiredResourceInstance],
|
||||
providerClient ResultRef[*exec.ProviderClient],
|
||||
) ResourceInstanceResultRef {
|
||||
return operationRef[*exec.ResourceInstanceObject](b, operationDesc{
|
||||
) ResultRef[*exec.OpenEphemeralResourceInstance] {
|
||||
return operationRef[*exec.OpenEphemeralResourceInstance](b, operationDesc{
|
||||
opCode: opEphemeralOpen,
|
||||
operands: []AnyResultRef{desiredInst, providerClient},
|
||||
})
|
||||
}
|
||||
|
||||
func (b *Builder) EphemeralState(
|
||||
ephemeralInst ResultRef[*exec.OpenEphemeralResourceInstance],
|
||||
) ResourceInstanceResultRef {
|
||||
return operationRef[*exec.ResourceInstanceObject](b, operationDesc{
|
||||
opCode: opEphemeralState,
|
||||
operands: []AnyResultRef{ephemeralInst},
|
||||
})
|
||||
}
|
||||
|
||||
func (b *Builder) EphemeralClose(
|
||||
obj ResourceInstanceResultRef,
|
||||
providerClient ResultRef[*exec.ProviderClient],
|
||||
ephemeralInst ResultRef[*exec.OpenEphemeralResourceInstance],
|
||||
waitFor AnyResultRef,
|
||||
) ResultRef[struct{}] {
|
||||
waiter := b.ensureWaiterRef(waitFor)
|
||||
return operationRef[struct{}](b, operationDesc{
|
||||
opCode: opEphemeralClose,
|
||||
operands: []AnyResultRef{obj, providerClient, waiter},
|
||||
operands: []AnyResultRef{ephemeralInst, waiter},
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -123,6 +123,8 @@ func (c *compiler) Compile() (*CompiledGraph, tfdiags.Diagnostics) {
|
|||
compileFunc = c.compileOpDataRead
|
||||
case opEphemeralOpen:
|
||||
compileFunc = c.compileOpEphemeralOpen
|
||||
case opEphemeralState:
|
||||
compileFunc = c.compileOpEphemeralState
|
||||
case opEphemeralClose:
|
||||
compileFunc = c.compileOpEphemeralClose
|
||||
default:
|
||||
|
|
|
|||
|
|
@ -339,15 +339,12 @@ func (c *compiler) compileOpEphemeralOpen(operands *compilerOperands) nodeExecut
|
|||
|
||||
ret, moreDiags := ops.EphemeralOpen(ctx, desired, providerClient)
|
||||
diags = diags.Append(moreDiags)
|
||||
// TODO: Also call ops.ResourceInstancePostconditions
|
||||
log.Printf("[WARN] opEphemeralOpen doesn't yet handle postconditions")
|
||||
return ret, !diags.HasErrors(), diags
|
||||
}
|
||||
}
|
||||
|
||||
func (c *compiler) compileOpEphemeralClose(operands *compilerOperands) nodeExecuteRaw {
|
||||
getObject := nextOperand[*exec.ResourceInstanceObject](operands)
|
||||
getProviderClient := nextOperand[*exec.ProviderClient](operands)
|
||||
func (c *compiler) compileOpEphemeralState(operands *compilerOperands) nodeExecuteRaw {
|
||||
getEphemeral := nextOperand[*exec.OpenEphemeralResourceInstance](operands)
|
||||
diags := operands.Finish()
|
||||
c.diags = c.diags.Append(diags)
|
||||
if diags.HasErrors() {
|
||||
|
|
@ -356,18 +353,42 @@ func (c *compiler) compileOpEphemeralClose(operands *compilerOperands) nodeExecu
|
|||
ops := c.ops
|
||||
|
||||
return func(ctx context.Context) (any, bool, tfdiags.Diagnostics) {
|
||||
providerClient, ok, moreDiags := getProviderClient(ctx)
|
||||
diags = diags.Append(moreDiags)
|
||||
if !ok {
|
||||
return nil, false, diags
|
||||
}
|
||||
object, ok, moreDiags := getObject(ctx)
|
||||
ephemeral, ok, moreDiags := getEphemeral(ctx)
|
||||
diags = diags.Append(moreDiags)
|
||||
if !ok {
|
||||
return nil, false, diags
|
||||
}
|
||||
|
||||
moreDiags = ops.EphemeralClose(ctx, object, providerClient)
|
||||
ret, moreDiags := ops.EphemeralState(ctx, ephemeral)
|
||||
diags = diags.Append(moreDiags)
|
||||
// TODO: Also call ops.ResourceInstancePostconditions
|
||||
log.Printf("[WARN] opEphemeralState doesn't yet handle postconditions")
|
||||
return ret, !diags.HasErrors(), diags
|
||||
}
|
||||
}
|
||||
|
||||
func (c *compiler) compileOpEphemeralClose(operands *compilerOperands) nodeExecuteRaw {
|
||||
getEphemeral := nextOperand[*exec.OpenEphemeralResourceInstance](operands)
|
||||
waitForUsers := operands.OperandWaiter()
|
||||
diags := operands.Finish()
|
||||
c.diags = c.diags.Append(diags)
|
||||
if diags.HasErrors() {
|
||||
return nil
|
||||
}
|
||||
ops := c.ops
|
||||
|
||||
return func(ctx context.Context) (any, bool, tfdiags.Diagnostics) {
|
||||
// We intentionally ignore results here because we want to close the
|
||||
// ephemeral even if one of its users fails.
|
||||
waitForUsers(ctx)
|
||||
|
||||
ephemeral, ok, moreDiags := getEphemeral(ctx)
|
||||
diags = diags.Append(moreDiags)
|
||||
if !ok {
|
||||
return nil, false, diags
|
||||
}
|
||||
|
||||
moreDiags = ops.EphemeralClose(ctx, ephemeral)
|
||||
diags = diags.Append(moreDiags)
|
||||
return struct{}{}, !diags.HasErrors(), diags
|
||||
}
|
||||
|
|
|
|||
|
|
@ -135,6 +135,8 @@ func unmarshalOperationElem(protoOp *execgraphproto.Operation, prevResults []Any
|
|||
return unmarshalOpDataRead(protoOp.GetOperands(), prevResults, builder)
|
||||
case opEphemeralOpen:
|
||||
return unmarshalOpEphemeralOpen(protoOp.GetOperands(), prevResults, builder)
|
||||
case opEphemeralState:
|
||||
return unmarshalOpEphemeralState(protoOp.GetOperands(), prevResults, builder)
|
||||
case opEphemeralClose:
|
||||
return unmarshalOpEphemeralClose(protoOp.GetOperands(), prevResults, builder)
|
||||
default:
|
||||
|
|
@ -310,23 +312,30 @@ func unmarshalOpEphemeralOpen(rawOperands []uint64, prevResults []AnyResultRef,
|
|||
return builder.EphemeralOpen(desiredInst, providerClient), nil
|
||||
}
|
||||
|
||||
func unmarshalOpEphemeralClose(rawOperands []uint64, prevResults []AnyResultRef, builder *Builder) (AnyResultRef, error) {
|
||||
if len(rawOperands) != 3 {
|
||||
func unmarshalOpEphemeralState(rawOperands []uint64, prevResults []AnyResultRef, builder *Builder) (AnyResultRef, error) {
|
||||
if len(rawOperands) != 1 {
|
||||
return nil, fmt.Errorf("wrong number of operands (%d) for opDataRead", len(rawOperands))
|
||||
}
|
||||
obj, err := unmarshalGetPrevResultOf[*exec.ResourceInstanceObject](prevResults, rawOperands[0])
|
||||
ephemeralInst, err := unmarshalGetPrevResultOf[*exec.OpenEphemeralResourceInstance](prevResults, rawOperands[0])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid opDataRead desiredInst: %w", err)
|
||||
}
|
||||
providerClient, err := unmarshalGetPrevResultOf[*exec.ProviderClient](prevResults, rawOperands[1])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid opDataRead providerClient: %w", err)
|
||||
return builder.EphemeralState(ephemeralInst), nil
|
||||
}
|
||||
|
||||
func unmarshalOpEphemeralClose(rawOperands []uint64, prevResults []AnyResultRef, builder *Builder) (AnyResultRef, error) {
|
||||
if len(rawOperands) != 2 {
|
||||
return nil, fmt.Errorf("wrong number of operands (%d) for opDataRead", len(rawOperands))
|
||||
}
|
||||
waitFor, err := unmarshalGetPrevResultWaiter(prevResults, rawOperands[2])
|
||||
ephemeralInst, err := unmarshalGetPrevResultOf[*exec.OpenEphemeralResourceInstance](prevResults, rawOperands[0])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid opDataRead desiredInst: %w", err)
|
||||
}
|
||||
waitFor, err := unmarshalGetPrevResultWaiter(prevResults, rawOperands[1])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid opDataRead waitFor: %w", err)
|
||||
}
|
||||
return builder.EphemeralClose(obj, providerClient, waitFor), nil
|
||||
return builder.EphemeralClose(ephemeralInst, waitFor), nil
|
||||
}
|
||||
|
||||
func unmarshalWaiterElem(protoWaiter *execgraphproto.Waiter, prevResults []AnyResultRef, builder *Builder) (AnyResultRef, error) {
|
||||
|
|
|
|||
|
|
@ -19,12 +19,13 @@ func _() {
|
|||
_ = x[opManagedAlreadyDeposed-9]
|
||||
_ = x[opDataRead-10]
|
||||
_ = x[opEphemeralOpen-11]
|
||||
_ = x[opEphemeralClose-12]
|
||||
_ = x[opEphemeralState-12]
|
||||
_ = x[opEphemeralClose-13]
|
||||
}
|
||||
|
||||
const _opCode_name = "opProviderInstanceConfigopProviderInstanceOpenopProviderInstanceCloseopResourceInstanceDesiredopResourceInstancePrioropManagedFinalPlanopManagedApplyopManagedDeposeopManagedAlreadyDeposedopDataReadopEphemeralOpenopEphemeralClose"
|
||||
const _opCode_name = "opProviderInstanceConfigopProviderInstanceOpenopProviderInstanceCloseopResourceInstanceDesiredopResourceInstancePrioropManagedFinalPlanopManagedApplyopManagedDeposeopManagedAlreadyDeposedopDataReadopEphemeralOpenopEphemeralStateopEphemeralClose"
|
||||
|
||||
var _opCode_index = [...]uint8{0, 24, 46, 69, 94, 117, 135, 149, 164, 187, 197, 212, 228}
|
||||
var _opCode_index = [...]uint8{0, 24, 46, 69, 94, 117, 135, 149, 164, 187, 197, 212, 228, 244}
|
||||
|
||||
func (i opCode) String() string {
|
||||
idx := int(i) - 1
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ const (
|
|||
opDataRead
|
||||
|
||||
opEphemeralOpen
|
||||
opEphemeralState
|
||||
opEphemeralClose
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -23,8 +23,9 @@ type mockOperations struct {
|
|||
Calls []mockOperationsCall
|
||||
|
||||
DataReadFunc func(ctx context.Context, desired *eval.DesiredResourceInstance, plannedVal cty.Value, providerClient *exec.ProviderClient) (*exec.ResourceInstanceObject, tfdiags.Diagnostics)
|
||||
EphemeralCloseFunc func(ctx context.Context, object *exec.ResourceInstanceObject, providerClient *exec.ProviderClient) tfdiags.Diagnostics
|
||||
EphemeralOpenFunc func(ctx context.Context, desired *eval.DesiredResourceInstance, providerClient *exec.ProviderClient) (*exec.ResourceInstanceObject, tfdiags.Diagnostics)
|
||||
EphemeralCloseFunc func(ctx context.Context, ephemeral *exec.OpenEphemeralResourceInstance) tfdiags.Diagnostics
|
||||
EphemeralOpenFunc func(ctx context.Context, desired *eval.DesiredResourceInstance, providerClient *exec.ProviderClient) (*exec.OpenEphemeralResourceInstance, tfdiags.Diagnostics)
|
||||
EphemeralStateFunc func(ctx context.Context, ephemeral *exec.OpenEphemeralResourceInstance) (*exec.ResourceInstanceObject, tfdiags.Diagnostics)
|
||||
ManagedAlreadyDeposedFunc func(ctx context.Context, instAddr addrs.AbsResourceInstance, deposedKey states.DeposedKey) (*exec.ResourceInstanceObject, tfdiags.Diagnostics)
|
||||
ManagedApplyFunc func(ctx context.Context, plan *exec.ManagedResourceObjectFinalPlan, fallback *exec.ResourceInstanceObject, providerClient *exec.ProviderClient) (*exec.ResourceInstanceObject, tfdiags.Diagnostics)
|
||||
ManagedDeposeFunc func(ctx context.Context, instAddr addrs.AbsResourceInstance) (*exec.ResourceInstanceObject, tfdiags.Diagnostics)
|
||||
|
|
@ -53,19 +54,19 @@ func (m *mockOperations) DataRead(ctx context.Context, desired *eval.DesiredReso
|
|||
}
|
||||
|
||||
// EphemeralClose implements [exec.Operations].
|
||||
func (m *mockOperations) EphemeralClose(ctx context.Context, object *exec.ResourceInstanceObject, providerClient *exec.ProviderClient) tfdiags.Diagnostics {
|
||||
func (m *mockOperations) EphemeralClose(ctx context.Context, ephemeral *exec.OpenEphemeralResourceInstance) tfdiags.Diagnostics {
|
||||
var diags tfdiags.Diagnostics
|
||||
if m.EphemeralCloseFunc != nil {
|
||||
diags = m.EphemeralCloseFunc(ctx, object, providerClient)
|
||||
diags = m.EphemeralCloseFunc(ctx, ephemeral)
|
||||
}
|
||||
m.appendLog("EphemeralClose", []any{object, providerClient}, struct{}{})
|
||||
m.appendLog("EphemeralClose", []any{ephemeral}, struct{}{})
|
||||
return diags
|
||||
}
|
||||
|
||||
// EphemeralOpen implements [exec.Operations].
|
||||
func (m *mockOperations) EphemeralOpen(ctx context.Context, desired *eval.DesiredResourceInstance, providerClient *exec.ProviderClient) (*exec.ResourceInstanceObject, tfdiags.Diagnostics) {
|
||||
func (m *mockOperations) EphemeralOpen(ctx context.Context, desired *eval.DesiredResourceInstance, providerClient *exec.ProviderClient) (*exec.OpenEphemeralResourceInstance, tfdiags.Diagnostics) {
|
||||
var diags tfdiags.Diagnostics
|
||||
var result *exec.ResourceInstanceObject
|
||||
var result *exec.OpenEphemeralResourceInstance
|
||||
if m.EphemeralOpenFunc != nil {
|
||||
result, diags = m.EphemeralOpenFunc(ctx, desired, providerClient)
|
||||
}
|
||||
|
|
@ -73,6 +74,17 @@ func (m *mockOperations) EphemeralOpen(ctx context.Context, desired *eval.Desire
|
|||
return result, diags
|
||||
}
|
||||
|
||||
// EphemeralState implements [exec.Operations].
|
||||
func (m *mockOperations) EphemeralState(ctx context.Context, ephemeral *exec.OpenEphemeralResourceInstance) (*exec.ResourceInstanceObject, tfdiags.Diagnostics) {
|
||||
var diags tfdiags.Diagnostics
|
||||
var result *exec.ResourceInstanceObject
|
||||
if m.EphemeralStateFunc != nil {
|
||||
result, diags = m.EphemeralStateFunc(ctx, ephemeral)
|
||||
}
|
||||
m.appendLog("EphemeralState", []any{ephemeral}, result)
|
||||
return result, diags
|
||||
}
|
||||
|
||||
// ManagedAlreadyDeposed implements [exec.Operations].
|
||||
func (m *mockOperations) ManagedAlreadyDeposed(ctx context.Context, instAddr addrs.AbsResourceInstance, deposedKey states.DeposedKey) (*exec.ResourceInstanceObject, tfdiags.Diagnostics) {
|
||||
var diags tfdiags.Diagnostics
|
||||
|
|
|
|||
|
|
@ -1,149 +0,0 @@
|
|||
// Copyright (c) The OpenTofu Authors
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
// Copyright (c) 2023 HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package lifecycle
|
||||
|
||||
import (
|
||||
"iter"
|
||||
"maps"
|
||||
"sync"
|
||||
|
||||
"github.com/opentofu/opentofu/internal/collections"
|
||||
)
|
||||
|
||||
// CompletionTracker is a synchronization utility that keeps a record of the
|
||||
// completion of various items and allows various different goroutines to wait
|
||||
// for the completion of different subsets of the items.
|
||||
//
|
||||
// "Items" can be of any comparable type, but the design intention is that a
|
||||
// caller will define its own types to represent the different kinds of work
|
||||
// it needs to track.
|
||||
type CompletionTracker[T comparable] struct {
|
||||
mu sync.Mutex
|
||||
completed collections.Set[T]
|
||||
waiters collections.Set[*completionWaiter[T]]
|
||||
}
|
||||
|
||||
type completionWaiter[T comparable] struct {
|
||||
pending collections.Set[T]
|
||||
ch chan<- struct{}
|
||||
}
|
||||
|
||||
// NewCompletionTracker returns a new [CompletionTracker] that initially
|
||||
// has no waiters and no completed items.
|
||||
func NewCompletionTracker[T comparable]() *CompletionTracker[T] {
|
||||
return &CompletionTracker[T]{
|
||||
completed: collections.NewSet[T](),
|
||||
waiters: collections.NewSet[*completionWaiter[T]](),
|
||||
}
|
||||
}
|
||||
|
||||
// ItemComplete returns true if the given item has already been reported
|
||||
// as complete using [CompletionTracker.ReportCompletion].
|
||||
//
|
||||
// A complete item can never become incomplete again, but if this function
|
||||
// returns false then a concurrent goroutine could potentially report the
|
||||
// item as complete before the caller acts on that result.
|
||||
func (t *CompletionTracker[T]) ItemComplete(item T) bool {
|
||||
t.mu.Lock()
|
||||
_, ret := t.completed[item]
|
||||
t.mu.Unlock()
|
||||
return ret
|
||||
}
|
||||
|
||||
// NewWaiterFor returns an unbuffered channel that will be closed once all
|
||||
// of the addresses in the given seqence have had their completion reported
|
||||
// using [CompletionTracker.ReportCompletion].
|
||||
//
|
||||
// No items will be sent to the channel.
|
||||
//
|
||||
// For callers that would just immediately block waiting for the given channel
|
||||
// to be closed (without using it as part of a larger "select" statement),
|
||||
// consider using the simpler [CompletionTracker.WaitFor] instead.
|
||||
func (t *CompletionTracker[T]) NewWaiterFor(waitFor iter.Seq[T]) <-chan struct{} {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
ch := make(chan struct{})
|
||||
waiter := &completionWaiter[T]{
|
||||
pending: collections.NewSet[T](),
|
||||
ch: ch,
|
||||
}
|
||||
for item := range waitFor {
|
||||
if t.completed.Has(item) {
|
||||
continue // ignore any already-completed items
|
||||
}
|
||||
waiter.pending[item] = struct{}{}
|
||||
}
|
||||
|
||||
if len(waiter.pending) == 0 {
|
||||
// If we didn't find any addresses that were not already completed
|
||||
// then we'll just close the channel immediately before we return,
|
||||
// and not track the waiter at all.
|
||||
close(ch)
|
||||
return ch
|
||||
}
|
||||
|
||||
// If we have at least one item to wait for then we'll remember this
|
||||
// new tracker so we can reconsider it each time something has its
|
||||
// completion reported.
|
||||
t.waiters[waiter] = struct{}{}
|
||||
return ch
|
||||
}
|
||||
|
||||
// WaitFor blocks until all of the addresses in the given set have had their
|
||||
// completion reported using [CompletionTracker.ReportCompletion].
|
||||
//
|
||||
// This is a convenience wrapper for [CompletionTracker.NewWaiterFor] that
|
||||
// just blocks until the returned channel is closed.
|
||||
func (t *CompletionTracker[T]) WaitFor(waitFor iter.Seq[T]) {
|
||||
ch := t.NewWaiterFor(waitFor)
|
||||
for range ch {
|
||||
// just block until the channel is closed
|
||||
}
|
||||
}
|
||||
|
||||
// ReportCompletion records the completion of the given item and signals
|
||||
// any waiters for which it was the last remaining pending item.
|
||||
func (t *CompletionTracker[T]) ReportCompletion(of T) {
|
||||
t.mu.Lock()
|
||||
t.completed[of] = struct{}{}
|
||||
for waiter := range t.waiters {
|
||||
delete(waiter.pending, of)
|
||||
if len(waiter.pending) == 0 {
|
||||
// nothing left for this waiter to wait for
|
||||
close(waiter.ch)
|
||||
delete(t.waiters, waiter)
|
||||
}
|
||||
}
|
||||
t.mu.Unlock()
|
||||
}
|
||||
|
||||
// PendingItems returns a set of all of the items that are pending for at
|
||||
// least one waiter at the time of the call.
|
||||
//
|
||||
// This is mainly to allow detection and cleanup of uncompleted work: once
|
||||
// a caller thinks that all work ought to have completed it can call this
|
||||
// function and should hopefully receive an empty set. If not, it can report
|
||||
// an error about certain items being left unresolved and optionally make
|
||||
// synthetic calls to [CompletionTracker.ReportCompletion] to cause all of the
|
||||
// remaining waiters to be unblocked.
|
||||
//
|
||||
// The result is a fresh set allocated for each call, so the caller is free
|
||||
// to modify that set without corrupting the internal state of the
|
||||
// [CompletionTracker].
|
||||
func (t *CompletionTracker[T]) PendingItems() collections.Set[T] {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
if len(t.waiters) == 0 {
|
||||
return nil
|
||||
}
|
||||
ret := collections.NewSet[T]()
|
||||
for waiter := range t.waiters {
|
||||
maps.Copy(ret, waiter.pending)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
|
@ -1,87 +0,0 @@
|
|||
// Copyright (c) The OpenTofu Authors
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
// Copyright (c) 2023 HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package lifecycle
|
||||
|
||||
import (
|
||||
"slices"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestCompletionTracker(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
|
||||
// This test would be a good candidate for the testing/synctest package,
|
||||
// but at the time of writing it that package hasn't been stablized yet.
|
||||
//
|
||||
// Using testing/synctest would make it possible to use synctest.Wait()
|
||||
// to be sure that the waiter goroutine has had a chance to write to
|
||||
// the "complete" flag before we test it, and so it would not need to
|
||||
// be an atomic.Bool anymore and our test that completing only a subset
|
||||
// of the items doesn't unblock would be reliable rather than best-effort.
|
||||
|
||||
// We'll use strings as the tracking keys here for simplicity's sake,
|
||||
// but the intention is that real callers of this would define their
|
||||
// own types to represent different kinds of trackable objects.
|
||||
tracker := NewCompletionTracker[string]()
|
||||
tracker.ReportCompletion("completed early")
|
||||
|
||||
var complete atomic.Bool
|
||||
waitCh := tracker.NewWaiterFor(slices.Values([]string{
|
||||
"completed early",
|
||||
"completed second",
|
||||
"completed last",
|
||||
}))
|
||||
waiterExitCh := make(chan struct{}) // closed once the goroutine below has finished waiting
|
||||
go func() {
|
||||
select {
|
||||
case <-waitCh:
|
||||
complete.Store(true)
|
||||
t.Log("waiting channel was closed")
|
||||
case <-ctx.Done():
|
||||
// We'll get here if the test finishes before waitCh is closed,
|
||||
// suggesting that something went wrong. We'll just return to
|
||||
// avoid leaking this goroutine, since the surrounding test has
|
||||
// presumably already failed anyway.
|
||||
}
|
||||
close(waiterExitCh)
|
||||
}()
|
||||
|
||||
if complete.Load() {
|
||||
t.Fatal("already complete before we resolved any other items")
|
||||
}
|
||||
t.Log("resolving the second item")
|
||||
tracker.ReportCompletion("completed second")
|
||||
// NOTE: The following is best effort as long as we aren't using
|
||||
// test/synctest, because we can't be sure that the waiting goroutine
|
||||
// has been scheduled long enough to reach the complete.Store(true).
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
if complete.Load() {
|
||||
t.Fatal("already complete before we resolved the final item")
|
||||
}
|
||||
t.Log("resolving the final item")
|
||||
tracker.ReportCompletion("completed last")
|
||||
<-waiterExitCh // wait for the waiter goroutine to exit
|
||||
if !complete.Load() {
|
||||
t.Fatal("not complete even though all items are resolved")
|
||||
}
|
||||
|
||||
// creating a waiter for items that have all already completed should
|
||||
// return a channel that's already closed.
|
||||
alreadyCompleteWaitCh := tracker.NewWaiterFor(slices.Values([]string{
|
||||
"completed early",
|
||||
"completed last",
|
||||
}))
|
||||
select {
|
||||
case <-alreadyCompleteWaitCh:
|
||||
// the expected case
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatal("already-completed waiter channel was not immediately closed")
|
||||
case <-ctx.Done():
|
||||
// test has somehow already exited?! (this should not be possible)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,82 +0,0 @@
|
|||
// Copyright (c) The OpenTofu Authors
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
// Copyright (c) 2023 HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package planning
|
||||
|
||||
import (
|
||||
"github.com/opentofu/opentofu/internal/addrs"
|
||||
"github.com/opentofu/opentofu/internal/engine/lifecycle"
|
||||
"github.com/opentofu/opentofu/internal/states"
|
||||
)
|
||||
|
||||
// completionTracker is the concrete type of [lifecycle.CompletionTracker] we
|
||||
// use to track completion during a planning operation.
|
||||
//
|
||||
// This uses implementations of [completionEvent] to represent the items that
|
||||
// need to be completed.
|
||||
type completionTracker = lifecycle.CompletionTracker[completionEvent]
|
||||
|
||||
func (p *planContext) reportResourceInstancePlanCompletion(addr addrs.AbsResourceInstance) {
|
||||
p.completion.ReportCompletion(resourceInstancePlanningComplete{addr.UniqueKey()})
|
||||
}
|
||||
|
||||
func (p *planContext) reportResourceInstanceDeposedPlanCompletion(addr addrs.AbsResourceInstance, deposedKey states.DeposedKey) {
|
||||
p.completion.ReportCompletion(resourceInstanceDeposedPlanningComplete{addr.UniqueKey(), deposedKey})
|
||||
}
|
||||
|
||||
func (p *planContext) reportProviderInstanceClosed(addr addrs.AbsProviderInstanceCorrect) {
|
||||
p.completion.ReportCompletion(providerInstanceClosed{addr.UniqueKey()})
|
||||
}
|
||||
|
||||
// completionEvent is the type we use to represent events in
|
||||
// a [completionTracker].
|
||||
//
|
||||
// All implementations of this interface must be comparable.
|
||||
type completionEvent interface {
|
||||
completionEvent()
|
||||
}
|
||||
|
||||
// resourceInstancePlanningComplete represents that we've completed planning
|
||||
// of a specific resource instance.
|
||||
//
|
||||
// Provider instances remain open until all of the resource instances that
|
||||
// belong to them have completed planning, and ephemeral resource instances
|
||||
// remain open until all of the other resource instances that depend on them
|
||||
// have completed planning.
|
||||
type resourceInstancePlanningComplete struct {
|
||||
// key MUST be the unique key of an addrs.ResourceInstance.
|
||||
key addrs.UniqueKey
|
||||
}
|
||||
|
||||
// completionEvent implements completionEvent.
|
||||
func (r resourceInstancePlanningComplete) completionEvent() {}
|
||||
|
||||
// resourceInstanceDeposedPlanningComplete is like
|
||||
// [resourceInstancePlanningComplete] but for "deposed" objects, rather than
|
||||
// for "current" objects.
|
||||
type resourceInstanceDeposedPlanningComplete struct {
|
||||
// key MUST be the unique key of an addrs.ResourceInstance.
|
||||
instKey addrs.UniqueKey
|
||||
|
||||
// deposedKey is the DeposedKey of the deposed object whose planning
|
||||
// is being tracked.
|
||||
deposedKey states.DeposedKey
|
||||
}
|
||||
|
||||
// completionEvent implements completionEvent.
|
||||
func (r resourceInstanceDeposedPlanningComplete) completionEvent() {}
|
||||
|
||||
// providerInstanceClosed represents that a previously-opened provider
|
||||
// instance has now been closed.
|
||||
//
|
||||
// Ephemeral resource instances remain open until all of the provider instances
|
||||
// that depend on them have been closed.
|
||||
type providerInstanceClosed struct {
|
||||
// key MUST be the unique key of an addrs.AbsProviderInstanceCorrect.
|
||||
key addrs.UniqueKey
|
||||
}
|
||||
|
||||
// completionEvent implements completionEvent.
|
||||
func (r providerInstanceClosed) completionEvent() {}
|
||||
|
|
@ -45,6 +45,7 @@ type execGraphBuilder struct {
|
|||
resourceInstAddrRefs addrs.Map[addrs.AbsResourceInstance, execgraph.ResultRef[addrs.AbsResourceInstance]]
|
||||
providerInstAddrRefs addrs.Map[addrs.AbsProviderInstanceCorrect, execgraph.ResultRef[addrs.AbsProviderInstanceCorrect]]
|
||||
openProviderRefs addrs.Map[addrs.AbsProviderInstanceCorrect, execResultWithCloseBlockers[*exec.ProviderClient]]
|
||||
openEphemeralRefs addrs.Map[addrs.AbsResourceInstance, registerExecCloseBlockerFunc]
|
||||
}
|
||||
|
||||
// NOTE: There are additional methods for [execGraphBuilder] declared in
|
||||
|
|
@ -57,6 +58,7 @@ func newExecGraphBuilder() *execGraphBuilder {
|
|||
resourceInstAddrRefs: addrs.MakeMap[addrs.AbsResourceInstance, execgraph.ResultRef[addrs.AbsResourceInstance]](),
|
||||
providerInstAddrRefs: addrs.MakeMap[addrs.AbsProviderInstanceCorrect, execgraph.ResultRef[addrs.AbsProviderInstanceCorrect]](),
|
||||
openProviderRefs: addrs.MakeMap[addrs.AbsProviderInstanceCorrect, execResultWithCloseBlockers[*exec.ProviderClient]](),
|
||||
openEphemeralRefs: addrs.MakeMap[addrs.AbsResourceInstance, registerExecCloseBlockerFunc](),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -93,9 +95,7 @@ type registerExecCloseBlockerFunc func(execgraph.AnyResultRef)
|
|||
func (b *execGraphBuilder) makeCloseBlocker() (execgraph.AnyResultRef, registerExecCloseBlockerFunc) {
|
||||
waiter, lowerRegister := b.lower.MutableWaiter()
|
||||
registerFunc := registerExecCloseBlockerFunc(func(ref execgraph.AnyResultRef) {
|
||||
b.mu.Lock()
|
||||
lowerRegister(ref)
|
||||
b.mu.Unlock()
|
||||
})
|
||||
return waiter, registerFunc
|
||||
}
|
||||
|
|
|
|||
59
internal/engine/planning/execgraph_ephemeral.go
Normal file
59
internal/engine/planning/execgraph_ephemeral.go
Normal file
|
|
@ -0,0 +1,59 @@
|
|||
// Copyright (c) The OpenTofu Authors
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
// Copyright (c) 2023 HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package planning
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/opentofu/opentofu/internal/engine/internal/execgraph"
|
||||
"github.com/opentofu/opentofu/internal/lang/eval"
|
||||
"github.com/zclconf/go-cty/cty"
|
||||
)
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
// This file contains methods of [execGraphBuilder] that are related to the
|
||||
// parts of an execution graph that deal with resource instances of mode
|
||||
// [addrs.EphemeralResourceMode] in particular.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// EphemeralResourceSubgraph adds graph nodes needed to apply changes for a
|
||||
// ephemeral resource instance, and returns what should be used as its final
|
||||
// result to propagate into to downstream references.
|
||||
//
|
||||
// TODO: This is definitely not sufficient for the full complexity of all of the
|
||||
// different ways ephemeral resources can potentially need to be handled in an
|
||||
// execution graph. It's just a simple placeholder adapted from code that was
|
||||
// originally written inline in [planGlue.planDesiredEphemeralResourceInstance]
|
||||
// just to preserve the existing functionality for now until we design a more
|
||||
// complete approach in later work.
|
||||
func (b *execGraphBuilder) EphemeralResourceInstanceSubgraph(ctx context.Context, desired *eval.DesiredResourceInstance, plannedValue cty.Value, oracle *eval.PlanningOracle) execgraph.ResourceInstanceResultRef {
|
||||
providerClientRef, closeProviderAfter := b.ProviderInstance(ctx, *desired.ProviderInstance, oracle)
|
||||
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
closeWait, registerCloseBlocker := b.makeCloseBlocker()
|
||||
b.openEphemeralRefs.Put(desired.Addr, registerCloseBlocker)
|
||||
|
||||
// We need to explicitly model our dependency on any upstream resource
|
||||
// instances in the resource instance graph. These don't naturally emerge
|
||||
// from the data flow because these results are intermediated through the
|
||||
// evaluator, which indirectly incorporates the results into the
|
||||
// desiredInstRef result we'll build below.
|
||||
dependencyWaiter, closeDependencyAfter := b.waiterForResourceInstances(desired.RequiredResourceInstances.All())
|
||||
|
||||
instAddrRef := b.lower.ConstantResourceInstAddr(desired.Addr)
|
||||
desiredInstRef := b.lower.ResourceInstanceDesired(instAddrRef, dependencyWaiter)
|
||||
|
||||
openRef := b.lower.EphemeralOpen(desiredInstRef, providerClientRef)
|
||||
stateRef := b.lower.EphemeralState(openRef)
|
||||
closeRef := b.lower.EphemeralClose(openRef, closeWait)
|
||||
|
||||
closeDependencyAfter(closeRef)
|
||||
closeProviderAfter(closeRef)
|
||||
|
||||
return stateRef
|
||||
}
|
||||
|
|
@ -6,6 +6,8 @@
|
|||
package planning
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zclconf/go-cty/cty"
|
||||
|
||||
"github.com/opentofu/opentofu/internal/engine/internal/exec"
|
||||
|
|
@ -29,15 +31,18 @@ import (
|
|||
// originally written inline in [planGlue.planDesiredManagedResourceInstance]
|
||||
// just to preserve the existing functionality for now until we design a more
|
||||
// complete approach in later work.
|
||||
func (b *execGraphBuilder) ManagedResourceInstanceSubgraph(desired *eval.DesiredResourceInstance, plannedValue cty.Value) execgraph.ResourceInstanceResultRef {
|
||||
func (b *execGraphBuilder) ManagedResourceInstanceSubgraph(ctx context.Context, desired *eval.DesiredResourceInstance, plannedValue cty.Value, oracle *eval.PlanningOracle) execgraph.ResourceInstanceResultRef {
|
||||
providerClientRef, closeProviderAfter := b.ProviderInstance(ctx, *desired.ProviderInstance, oracle)
|
||||
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
// We need to explicitly model our dependency on any upstream resource
|
||||
// instances in the resource instance graph. These don't naturally emerge
|
||||
// from the data flow because these results are intermediated through the
|
||||
// evaluator, which indirectly incorporates the results into the
|
||||
// desiredInstRef result we'll build below.
|
||||
dependencyWaiter := b.waiterForResourceInstances(desired.RequiredResourceInstances.All())
|
||||
|
||||
providerClientRef, closeProviderAfter := b.ProviderInstance(*desired.ProviderInstance, b.lower.Waiter())
|
||||
dependencyWaiter, closeDependencyAfter := b.waiterForResourceInstances(desired.RequiredResourceInstances.All())
|
||||
|
||||
// FIXME: If this is one of the "replace" actions then we need to generate
|
||||
// a more complex graph that has two pairs of "final plan" and "apply".
|
||||
|
|
@ -56,7 +61,9 @@ func (b *execGraphBuilder) ManagedResourceInstanceSubgraph(desired *eval.Desired
|
|||
execgraph.NilResultRef[*exec.ResourceInstanceObject](),
|
||||
providerClientRef,
|
||||
)
|
||||
|
||||
closeProviderAfter(finalResultRef)
|
||||
closeDependencyAfter(finalResultRef)
|
||||
|
||||
return finalResultRef
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,9 +6,12 @@
|
|||
package planning
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/opentofu/opentofu/internal/addrs"
|
||||
"github.com/opentofu/opentofu/internal/engine/internal/exec"
|
||||
"github.com/opentofu/opentofu/internal/engine/internal/execgraph"
|
||||
"github.com/opentofu/opentofu/internal/lang/eval"
|
||||
)
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
|
@ -29,10 +32,17 @@ import (
|
|||
// the graph at once, although each distinct provider instance address gets
|
||||
// only one set of nodes added and then subsequent calls get references to
|
||||
// the same operation results.
|
||||
func (b *execGraphBuilder) ProviderInstance(addr addrs.AbsProviderInstanceCorrect, waitFor execgraph.AnyResultRef) (execgraph.ResultRef[*exec.ProviderClient], registerExecCloseBlockerFunc) {
|
||||
func (b *execGraphBuilder) ProviderInstance(ctx context.Context, addr addrs.AbsProviderInstanceCorrect, oracle *eval.PlanningOracle) (execgraph.ResultRef[*exec.ProviderClient], registerExecCloseBlockerFunc) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
resourceDependencies := addrs.MakeSet[addrs.AbsResourceInstance]()
|
||||
rawDependencies := oracle.ProviderInstanceResourceDependencies(ctx, addr)
|
||||
for dep := range rawDependencies {
|
||||
resourceDependencies.Add(dep.Addr)
|
||||
}
|
||||
dependencyWaiter, closeDependencyAfter := b.waiterForResourceInstances(resourceDependencies.All())
|
||||
|
||||
// FIXME: This is an adaptation of an earlier attempt at this where this
|
||||
// helper was in the underlying execgraph.Builder type, built to work
|
||||
// without any help from the planning engine. But this design isn't
|
||||
|
|
@ -58,14 +68,13 @@ func (b *execGraphBuilder) ProviderInstance(addr addrs.AbsProviderInstanceCorrec
|
|||
if existing, ok := b.openProviderRefs.GetOk(addr); ok {
|
||||
return existing.Result, existing.CloseBlockerFunc
|
||||
}
|
||||
configResult := b.lower.ProviderInstanceConfig(addrResult, waitFor)
|
||||
configResult := b.lower.ProviderInstanceConfig(addrResult, dependencyWaiter)
|
||||
openResult := b.lower.ProviderInstanceOpen(configResult)
|
||||
closeWait, registerCloseBlocker := b.makeCloseBlocker()
|
||||
// Nothing actually depends on the result of the "close" operation, but
|
||||
// eventual execution of the graph will still wait for it to complete
|
||||
// because _all_ operations must complete before execution is considered
|
||||
// to be finished.
|
||||
_ = b.lower.ProviderInstanceClose(openResult, closeWait)
|
||||
|
||||
closeRef := b.lower.ProviderInstanceClose(openResult, closeWait)
|
||||
closeDependencyAfter(closeRef)
|
||||
|
||||
b.openProviderRefs.Put(addr, execResultWithCloseBlockers[*exec.ProviderClient]{
|
||||
Result: openResult,
|
||||
CloseBlockerResult: closeWait,
|
||||
|
|
|
|||
|
|
@ -45,8 +45,6 @@ func (b *execGraphBuilder) SetResourceInstanceFinalStateResult(addr addrs.AbsRes
|
|||
// system that caused the construction of subgraphs for different resource
|
||||
// instances to happen in the wrong order.
|
||||
func (b *execGraphBuilder) resourceInstanceFinalStateResult(addr addrs.AbsResourceInstance) execgraph.AnyResultRef {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
return b.lower.ResourceInstanceFinalStateResult(addr)
|
||||
}
|
||||
|
||||
|
|
@ -60,11 +58,21 @@ func (b *execGraphBuilder) resourceInstanceFinalStateResult(addr addrs.AbsResour
|
|||
// The value of the returned result is not actually meaningful; it's used only
|
||||
// for its blocking behavior to add additional ordering constraints to an
|
||||
// execution graph.
|
||||
func (b *execGraphBuilder) waiterForResourceInstances(instAddrs iter.Seq[addrs.AbsResourceInstance]) execgraph.AnyResultRef {
|
||||
//
|
||||
// The function returned allows callers to ensure any dependency resources
|
||||
// that stay "open" will not be closed until the given references has completed.
|
||||
func (b *execGraphBuilder) waiterForResourceInstances(instAddrs iter.Seq[addrs.AbsResourceInstance]) (execgraph.AnyResultRef, registerExecCloseBlockerFunc) {
|
||||
var dependencyResults []execgraph.AnyResultRef
|
||||
for instAddr := range instAddrs {
|
||||
depInstResult := b.resourceInstanceFinalStateResult(instAddr)
|
||||
dependencyResults = append(dependencyResults, depInstResult)
|
||||
}
|
||||
return b.lower.Waiter(dependencyResults...)
|
||||
|
||||
return b.lower.Waiter(dependencyResults...), func(ref execgraph.AnyResultRef) {
|
||||
for instAddr := range instAddrs {
|
||||
if instAddr.Resource.Resource.Mode == addrs.EphemeralResourceMode {
|
||||
b.openEphemeralRefs.Get(instAddr)(ref)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -74,9 +74,9 @@ func PlanChanges(ctx context.Context, prevRoundState *states.State, configInst *
|
|||
// here but we'll still produce a best-effort [plans.Plan] describing
|
||||
// the situation because that often gives useful information for debugging
|
||||
// what caused the errors.
|
||||
plan := planCtx.Close()
|
||||
plan, moreDiags := planCtx.Close(ctx)
|
||||
plan.Errored = true
|
||||
return plan, diags
|
||||
return plan, diags.Append(moreDiags)
|
||||
}
|
||||
if evalResult == nil {
|
||||
// This should not happen: we should always have an evalResult if
|
||||
|
|
@ -125,5 +125,10 @@ func PlanChanges(ctx context.Context, prevRoundState *states.State, configInst *
|
|||
}
|
||||
}
|
||||
|
||||
return planCtx.Close(), diags
|
||||
plan, moreDiags := planCtx.Close(ctx)
|
||||
diags = diags.Append(moreDiags)
|
||||
if diags.HasErrors() {
|
||||
plan.Errored = true
|
||||
}
|
||||
return plan, diags
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,15 +6,18 @@
|
|||
package planning
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
"github.com/opentofu/opentofu/internal/addrs"
|
||||
"github.com/opentofu/opentofu/internal/engine/lifecycle"
|
||||
"github.com/opentofu/opentofu/internal/engine/plugins"
|
||||
"github.com/opentofu/opentofu/internal/lang/eval"
|
||||
"github.com/opentofu/opentofu/internal/logging"
|
||||
"github.com/opentofu/opentofu/internal/plans"
|
||||
"github.com/opentofu/opentofu/internal/states"
|
||||
"github.com/opentofu/opentofu/internal/tfdiags"
|
||||
)
|
||||
|
||||
// planContext is our shared state for the various parts of a single call
|
||||
|
|
@ -45,15 +48,18 @@ type planContext struct {
|
|||
// of prevRoundState.
|
||||
refreshedState *states.SyncState
|
||||
|
||||
completion *completionTracker
|
||||
providers plugins.Providers
|
||||
|
||||
providerInstances *providerInstances
|
||||
|
||||
providers plugins.Providers
|
||||
|
||||
// TODO: something to track which ephemeral resource instances are currently
|
||||
// open? (Do we actually need that, or can we just rely on a background
|
||||
// goroutine to babysit those based on the completion tracker?)
|
||||
// Stack of ephemeral and provider close functions
|
||||
// Given the current state of the planning engine, we wait until
|
||||
// the end of the run to close all of the "opened" items. We
|
||||
// also need to close them in a specific order to prevent dependency
|
||||
// conflicts. We posit that for plan, closing in the reverse order of opens
|
||||
// will ensure that this order is correctly preserved.
|
||||
closeStackMu sync.Mutex
|
||||
closeStack []func(context.Context) tfdiags.Diagnostics
|
||||
}
|
||||
|
||||
func newPlanContext(evalCtx *eval.EvalContext, prevRoundState *states.State, providers plugins.Providers) *planContext {
|
||||
|
|
@ -63,8 +69,6 @@ func newPlanContext(evalCtx *eval.EvalContext, prevRoundState *states.State, pro
|
|||
changes := plans.NewChanges()
|
||||
refreshedState := prevRoundState.DeepCopy()
|
||||
|
||||
completion := lifecycle.NewCompletionTracker[completionEvent]()
|
||||
|
||||
execGraphBuilder := newExecGraphBuilder()
|
||||
|
||||
return &planContext{
|
||||
|
|
@ -73,8 +77,7 @@ func newPlanContext(evalCtx *eval.EvalContext, prevRoundState *states.State, pro
|
|||
execGraphBuilder: execGraphBuilder,
|
||||
prevRoundState: prevRoundState,
|
||||
refreshedState: refreshedState.SyncWrapper(),
|
||||
completion: completion,
|
||||
providerInstances: newProviderInstances(completion),
|
||||
providerInstances: newProviderInstances(),
|
||||
providers: providers,
|
||||
}
|
||||
}
|
||||
|
|
@ -84,16 +87,8 @@ func newPlanContext(evalCtx *eval.EvalContext, prevRoundState *states.State, pro
|
|||
//
|
||||
// After calling this function the [planContext] object is invalid and must
|
||||
// not be used anymore.
|
||||
func (p *planContext) Close() *plans.Plan {
|
||||
// Before we return we'll make sure our completion tracker isn't waiting
|
||||
// for anything else to complete, so that we can unblock closing of
|
||||
// any provider instances or ephemeral resource instances that might've
|
||||
// got left behind by panics/etc. We should not be relying on this in the
|
||||
// happy path.
|
||||
for event := range p.completion.PendingItems() {
|
||||
log.Printf("[TRACE] planContext: synthetic completion of %#v", event)
|
||||
p.completion.ReportCompletion(event)
|
||||
}
|
||||
func (p *planContext) Close(ctx context.Context) (*plans.Plan, tfdiags.Diagnostics) {
|
||||
var diags tfdiags.Diagnostics
|
||||
|
||||
// We'll freeze the execution graph into a serialized form here, so that
|
||||
// we can recover an equivalent execution graph again during the apply
|
||||
|
|
@ -102,6 +97,15 @@ func (p *planContext) Close() *plans.Plan {
|
|||
if logging.IsDebugOrHigher() {
|
||||
log.Println("[DEBUG] Planned execution graph:\n" + logging.Indent(execGraph.DebugRepr()))
|
||||
}
|
||||
|
||||
p.closeStackMu.Lock()
|
||||
defer p.closeStackMu.Unlock()
|
||||
|
||||
slices.Reverse(p.closeStack)
|
||||
for _, closer := range p.closeStack {
|
||||
diags = diags.Append(closer(ctx))
|
||||
}
|
||||
|
||||
execGraphOpaque := execGraph.Marshal()
|
||||
|
||||
return &plans.Plan{
|
||||
|
|
@ -119,5 +123,5 @@ func (p *planContext) Close() *plans.Plan {
|
|||
// graph so we can round-trip it through saved plan files while
|
||||
// the CLI layer is still working in terms of [plans.Plan].
|
||||
ExecutionGraph: execGraphOpaque,
|
||||
}
|
||||
}, diags
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,8 +20,6 @@ import (
|
|||
)
|
||||
|
||||
func (p *planGlue) planDesiredDataResourceInstance(ctx context.Context, inst *eval.DesiredResourceInstance, egb *execGraphBuilder) (cty.Value, execgraph.ResourceInstanceResultRef, tfdiags.Diagnostics) {
|
||||
// Regardless of outcome we'll always report that we completed planning.
|
||||
defer p.planCtx.reportResourceInstancePlanCompletion(inst.Addr)
|
||||
var diags tfdiags.Diagnostics
|
||||
|
||||
validateDiags := p.planCtx.providers.ValidateResourceConfig(ctx, inst.Provider, inst.ResourceMode, inst.ResourceType, inst.ConfigVal)
|
||||
|
|
@ -91,8 +89,6 @@ func (p *planGlue) planDesiredDataResourceInstance(ctx context.Context, inst *ev
|
|||
}
|
||||
|
||||
func (p *planGlue) planOrphanDataResourceInstance(_ context.Context, addr addrs.AbsResourceInstance, state *states.ResourceInstanceObjectFullSrc, egb *execGraphBuilder) tfdiags.Diagnostics {
|
||||
// Regardless of outcome we'll always report that we completed planning.
|
||||
defer p.planCtx.reportResourceInstancePlanCompletion(addr)
|
||||
var diags tfdiags.Diagnostics
|
||||
|
||||
// An orphan data object is always just discarded completely, because
|
||||
|
|
|
|||
|
|
@ -7,24 +7,64 @@ package planning
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/opentofu/opentofu/internal/engine/internal/execgraph"
|
||||
"github.com/opentofu/opentofu/internal/lang/eval"
|
||||
"github.com/opentofu/opentofu/internal/shared"
|
||||
"github.com/opentofu/opentofu/internal/tfdiags"
|
||||
"github.com/zclconf/go-cty/cty"
|
||||
)
|
||||
|
||||
func (p *planGlue) planDesiredEphemeralResourceInstance(ctx context.Context, inst *eval.DesiredResourceInstance, egb *execGraphBuilder) (cty.Value, execgraph.ResourceInstanceResultRef, tfdiags.Diagnostics) {
|
||||
// Regardless of outcome we'll always report that we completed planning.
|
||||
defer p.planCtx.reportResourceInstancePlanCompletion(inst.Addr)
|
||||
var diags tfdiags.Diagnostics
|
||||
|
||||
validateDiags := p.planCtx.providers.ValidateResourceConfig(ctx, inst.Provider, inst.Addr.Resource.Resource.Mode, inst.Addr.Resource.Resource.Type, inst.ConfigVal)
|
||||
diags = diags.Append(validateDiags)
|
||||
if diags.HasErrors() {
|
||||
return cty.DynamicVal, nil, diags
|
||||
schema, _ := p.planCtx.providers.ResourceTypeSchema(ctx, inst.Provider, inst.Addr.Resource.Resource.Mode, inst.Addr.Resource.Resource.Type)
|
||||
if schema == nil || schema.Block == nil {
|
||||
// Should be caught during validation, so we don't bother with a pretty error here
|
||||
diags = diags.Append(fmt.Errorf("provider %q does not support ephemeral resource %q", inst.ProviderInstance, inst.Addr.Resource.Resource.Type))
|
||||
return cty.NilVal, nil, diags
|
||||
}
|
||||
|
||||
// TODO: Implement
|
||||
panic("unimplemented")
|
||||
if inst.ProviderInstance == nil {
|
||||
// If we don't even know which provider instance we're supposed to be
|
||||
// talking to then we'll just return a placeholder value, because
|
||||
// we don't have any way to generate a speculative plan.
|
||||
return cty.NilVal, nil, diags
|
||||
}
|
||||
|
||||
providerClient, moreDiags := p.providerClient(ctx, *inst.ProviderInstance)
|
||||
if providerClient == nil {
|
||||
moreDiags = moreDiags.Append(tfdiags.AttributeValue(
|
||||
tfdiags.Error,
|
||||
"Provider instance not available",
|
||||
fmt.Sprintf("Cannot plan %s because its associated provider instance %s cannot initialize.", inst.Addr, *inst.ProviderInstance),
|
||||
nil,
|
||||
))
|
||||
}
|
||||
diags = diags.Append(moreDiags)
|
||||
if moreDiags.HasErrors() {
|
||||
return cty.NilVal, nil, diags
|
||||
}
|
||||
|
||||
newVal, closeFunc, openDiags := shared.OpenEphemeralResourceInstance(
|
||||
ctx,
|
||||
inst.Addr,
|
||||
schema.Block,
|
||||
*inst.ProviderInstance,
|
||||
providerClient,
|
||||
inst.ConfigVal,
|
||||
shared.EphemeralResourceHooks{},
|
||||
)
|
||||
diags = diags.Append(openDiags)
|
||||
if openDiags.HasErrors() {
|
||||
return cty.NilVal, nil, diags
|
||||
}
|
||||
|
||||
p.planCtx.closeStackMu.Lock()
|
||||
p.planCtx.closeStack = append(p.planCtx.closeStack, closeFunc)
|
||||
p.planCtx.closeStackMu.Unlock()
|
||||
|
||||
resRef := egb.EphemeralResourceInstanceSubgraph(ctx, inst, newVal, p.oracle)
|
||||
return newVal, resRef, diags
|
||||
}
|
||||
|
|
|
|||
|
|
@ -297,49 +297,6 @@ func (p *planGlue) providerClient(ctx context.Context, addr addrs.AbsProviderIns
|
|||
return p.planCtx.providerInstances.ProviderClient(ctx, addr, p)
|
||||
}
|
||||
|
||||
// providerInstanceCompletionEvents returns all of the [completionEvent] values
|
||||
// that need to have been reported to the completion tracker before an
|
||||
// instance of the given provider can be closed.
|
||||
func (p *planGlue) providerInstanceCompletionEvents(ctx context.Context, addr addrs.AbsProviderInstanceCorrect) iter.Seq[completionEvent] {
|
||||
return func(yield func(completionEvent) bool) {
|
||||
configUsers := p.oracle.ProviderInstanceUsers(ctx, addr)
|
||||
for _, resourceInstAddr := range configUsers.ResourceInstances {
|
||||
event := resourceInstancePlanningComplete{resourceInstAddr.UniqueKey()}
|
||||
if !yield(event) {
|
||||
return
|
||||
}
|
||||
}
|
||||
// We also need to wait for the completion of anything we can find
|
||||
// in the state, just in case any resource instances are "orphaned"
|
||||
// and in case there are any deposed objects we need to deal with.
|
||||
for _, modState := range p.planCtx.prevRoundState.Modules {
|
||||
for _, resourceState := range modState.Resources {
|
||||
for instKey, instanceState := range resourceState.Instances {
|
||||
resourceInstAddr := resourceState.Addr.Instance(instKey)
|
||||
// FIXME: State is still using the not-quite-right address
|
||||
// types for provider instances, so we'll shim here.
|
||||
providerInstAddr := resourceState.ProviderConfig.InstanceCorrect(instanceState.ProviderKey)
|
||||
if !addr.Equal(providerInstAddr) {
|
||||
continue // not for this provider instance
|
||||
}
|
||||
if instanceState.Current != nil {
|
||||
event := resourceInstancePlanningComplete{resourceInstAddr.UniqueKey()}
|
||||
if !yield(event) {
|
||||
return
|
||||
}
|
||||
}
|
||||
for dk := range instanceState.Deposed {
|
||||
event := resourceInstanceDeposedPlanningComplete{resourceInstAddr.UniqueKey(), dk}
|
||||
if !yield(event) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *planGlue) desiredResourceInstanceMustBeDeferred(inst *eval.DesiredResourceInstance) bool {
|
||||
// There are various reasons why we might need to defer final planning
|
||||
// of this to a later round. The following is not exhaustive but is a
|
||||
|
|
|
|||
|
|
@ -23,9 +23,6 @@ import (
|
|||
)
|
||||
|
||||
func (p *planGlue) planDesiredManagedResourceInstance(ctx context.Context, inst *eval.DesiredResourceInstance, egb *execGraphBuilder) (plannedVal cty.Value, applyResultRef execgraph.ResourceInstanceResultRef, diags tfdiags.Diagnostics) {
|
||||
// Regardless of outcome we'll always report that we completed planning.
|
||||
defer p.planCtx.reportResourceInstancePlanCompletion(inst.Addr)
|
||||
|
||||
// There are various reasons why we might need to defer final planning
|
||||
// of this to a later round. The following is not exhaustive but is a
|
||||
// placeholder to show where deferral might fit in.
|
||||
|
|
@ -262,24 +259,18 @@ func (p *planGlue) planDesiredManagedResourceInstance(ctx context.Context, inst
|
|||
// and reasonable. In particular, these subgraph-building methods should
|
||||
// be easily unit-testable due to not depending on anything other than
|
||||
// their input.
|
||||
finalResultRef := egb.ManagedResourceInstanceSubgraph(inst, planResp.PlannedState)
|
||||
finalResultRef := egb.ManagedResourceInstanceSubgraph(ctx, inst, planResp.PlannedState, p.oracle)
|
||||
|
||||
// Our result value for ongoing downstream planning is the planned new state.
|
||||
return planResp.PlannedState, finalResultRef, diags
|
||||
}
|
||||
|
||||
func (p *planGlue) planOrphanManagedResourceInstance(ctx context.Context, addr addrs.AbsResourceInstance, state *states.ResourceInstanceObjectFullSrc, egb *execGraphBuilder) tfdiags.Diagnostics {
|
||||
// Regardless of outcome we'll always report that we completed planning.
|
||||
defer p.planCtx.reportResourceInstancePlanCompletion(addr)
|
||||
|
||||
// TODO: Implement
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
func (p *planGlue) planDeposedManagedResourceInstanceObject(ctx context.Context, addr addrs.AbsResourceInstance, deposedKey states.DeposedKey, state *states.ResourceInstanceObjectFullSrc, egb *execGraphBuilder) tfdiags.Diagnostics {
|
||||
// Regardless of outcome we'll always report that we completed planning.
|
||||
defer p.planCtx.reportResourceInstanceDeposedPlanCompletion(addr, deposedKey)
|
||||
|
||||
// TODO: Implement
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,17 +29,11 @@ type providerInstances struct {
|
|||
// before waiting on an object retrieved from it.
|
||||
active addrs.Map[addrs.AbsProviderInstanceCorrect, *grapheval.Once[providers.Configured]]
|
||||
activeMu sync.Mutex
|
||||
|
||||
// completion is a [completionTracker] that's shared with the [planCtx]
|
||||
// we belong to so that we can detect when all of the work of each particular
|
||||
// provider instance has completed.
|
||||
completion *completionTracker
|
||||
}
|
||||
|
||||
func newProviderInstances(completion *completionTracker) *providerInstances {
|
||||
func newProviderInstances() *providerInstances {
|
||||
return &providerInstances{
|
||||
active: addrs.MakeMap[addrs.AbsProviderInstanceCorrect, *grapheval.Once[providers.Configured]](),
|
||||
completion: completion,
|
||||
active: addrs.MakeMap[addrs.AbsProviderInstanceCorrect, *grapheval.Once[providers.Configured]](),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -69,6 +63,7 @@ func (pi *providerInstances) ProviderClient(ctx context.Context, addr addrs.AbsP
|
|||
pi.activeMu.Unlock()
|
||||
|
||||
oracle := planGlue.oracle
|
||||
planCtx := planGlue.planCtx
|
||||
once := pi.active.Get(addr)
|
||||
return once.Do(ctx, func(ctx context.Context) (providers.Configured, tfdiags.Diagnostics) {
|
||||
configVal := oracle.ProviderInstanceConfig(ctx, addr)
|
||||
|
|
@ -81,6 +76,7 @@ func (pi *providerInstances) ProviderClient(ctx context.Context, addr addrs.AbsP
|
|||
// be performed.
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// If _this_ call fails then unfortunately we'll end up duplicating
|
||||
// its diagnostics for every resource instance that depends on this
|
||||
// provider instance, which is not ideal but we don't currently have
|
||||
|
|
@ -92,33 +88,29 @@ func (pi *providerInstances) ProviderClient(ctx context.Context, addr addrs.AbsP
|
|||
// then this should return "nil, nil" in the error case so that the
|
||||
// caller will treat it the same as a "configuration not valid enough"
|
||||
// problem.
|
||||
ret, diags := planGlue.planCtx.providers.NewConfiguredProvider(ctx, addr.Config.Config.Provider, configVal)
|
||||
ret, diags := planCtx.providers.NewConfiguredProvider(ctx, addr.Config.Config.Provider, configVal)
|
||||
|
||||
closeCh := make(chan struct{})
|
||||
|
||||
planCtx.closeStackMu.Lock()
|
||||
planCtx.closeStack = append(planCtx.closeStack, func(ctx context.Context) tfdiags.Diagnostics {
|
||||
println("CLOSING PROVIDER " + addr.String())
|
||||
closeCh <- struct{}{}
|
||||
return tfdiags.Diagnostics{}.Append(ret.Close(ctx))
|
||||
})
|
||||
planCtx.closeStackMu.Unlock()
|
||||
|
||||
// This background goroutine deals with closing the provider once it's
|
||||
// no longer needed, and with asking it to gracefully stop if our
|
||||
// given context is cancelled.
|
||||
waitCh := pi.completion.NewWaiterFor(planGlue.providerInstanceCompletionEvents(ctx, addr))
|
||||
go func() {
|
||||
// Once this goroutine is complete the provider instance should be
|
||||
// treated as closed.
|
||||
defer planGlue.planCtx.reportProviderInstanceClosed(addr)
|
||||
|
||||
cancelCtx := ctx
|
||||
withoutCancelCtx := context.WithoutCancel(ctx)
|
||||
for {
|
||||
select {
|
||||
case <-waitCh:
|
||||
// Everything that we were expecting to use the provider
|
||||
// instance has now completed, so we can close it.
|
||||
//
|
||||
// (An error from this goes nowhere. If we want to track
|
||||
// this then maybe we _should_ switch to having a central
|
||||
// diags repository inside the providerInstances object,
|
||||
// as discussed above for failing NewConfiguredProvider,
|
||||
// and then we could write this failure into there.)
|
||||
if ret != nil {
|
||||
_ = ret.Close(withoutCancelCtx)
|
||||
}
|
||||
case <-closeCh:
|
||||
// Close() has been called from within the closers
|
||||
// No further actions are nessesary
|
||||
return
|
||||
case <-cancelCtx.Done():
|
||||
// If the context we were given is cancelled then we'll
|
||||
|
|
@ -128,10 +120,7 @@ func (pi *providerInstances) ProviderClient(ctx context.Context, addr addrs.AbsP
|
|||
if ret != nil {
|
||||
_ = ret.Stop(withoutCancelCtx)
|
||||
}
|
||||
// We'll now replace cancelCtx with the one guaranteed
|
||||
// to never be cancelled so that we'll block until waitCh
|
||||
// is closed.
|
||||
cancelCtx = withoutCancelCtx
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
|
|
|||
|
|
@ -110,8 +110,9 @@ func (n *newRuntimePlugins) NewConfiguredProvider(ctx context.Context, provider
|
|||
return nil, diags
|
||||
}
|
||||
|
||||
unmarkedConfigVal, _ := configVal.UnmarkDeep()
|
||||
resp := inst.ConfigureProvider(ctx, providers.ConfigureProviderRequest{
|
||||
Config: configVal,
|
||||
Config: unmarkedConfigVal,
|
||||
|
||||
// We aren't actually Terraform, so we'll just pretend to be a
|
||||
// Terraform version that has roughly the same functionality that
|
||||
|
|
@ -197,8 +198,9 @@ func (n *newRuntimePlugins) ValidateProviderConfig(ctx context.Context, provider
|
|||
return diags
|
||||
}
|
||||
|
||||
unmarkedConfigVal, _ := configVal.UnmarkDeep()
|
||||
resp := inst.ValidateProviderConfig(ctx, providers.ValidateProviderConfigRequest{
|
||||
Config: configVal,
|
||||
Config: unmarkedConfigVal,
|
||||
})
|
||||
diags = diags.Append(resp.Diagnostics)
|
||||
return diags
|
||||
|
|
|
|||
|
|
@ -115,15 +115,12 @@ type PlanGlue interface {
|
|||
// actions to destroy any instances that are currently tracked but no longer
|
||||
// configured.
|
||||
func (c *ConfigInstance) DrivePlanning(ctx context.Context, buildGlue func(*PlanningOracle) PlanGlue) (*PlanningResult, tfdiags.Diagnostics) {
|
||||
var diags tfdiags.Diagnostics
|
||||
|
||||
// All of our work will be associated with a workgraph worker that serves
|
||||
// as the initial worker node in the work graph.
|
||||
ctx = grapheval.ContextWithNewWorker(ctx)
|
||||
|
||||
relationships, diags := c.prepareToPlan(ctx)
|
||||
if diags.HasErrors() {
|
||||
return nil, diags
|
||||
}
|
||||
|
||||
// We have a little chicken vs. egg problem here where we can't fully
|
||||
// initialize the oracle until we've built the root module instance,
|
||||
// so we initially pass an intentionally-invalid oracle to the build
|
||||
|
|
@ -142,7 +139,6 @@ func (c *ConfigInstance) DrivePlanning(ctx context.Context, buildGlue func(*Plan
|
|||
}
|
||||
// We can now initialize the planning oracle, before we start evaluating
|
||||
// anything that might cause calls to the evalGlue object.
|
||||
oracle.relationships = relationships
|
||||
oracle.rootModuleInstance = rootModuleInstance
|
||||
oracle.evalContext = c.evalContext
|
||||
|
||||
|
|
|
|||
|
|
@ -7,11 +7,12 @@ package eval
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"iter"
|
||||
|
||||
"github.com/zclconf/go-cty/cty"
|
||||
|
||||
"github.com/opentofu/opentofu/internal/addrs"
|
||||
"github.com/opentofu/opentofu/internal/lang/eval/internal/configgraph"
|
||||
"github.com/opentofu/opentofu/internal/lang/eval/internal/evalglue"
|
||||
"github.com/opentofu/opentofu/internal/lang/grapheval"
|
||||
)
|
||||
|
|
@ -19,8 +20,6 @@ import (
|
|||
// A PlanningOracle provides information from the configuration that is needed
|
||||
// by the planning engine to help orchestrate the planning process.
|
||||
type PlanningOracle struct {
|
||||
relationships *ResourceRelationships
|
||||
|
||||
// NOTE: Any method of PlanningOracle that interacts with methods of
|
||||
// this or anything accessible through it MUST use
|
||||
// [grapheval.ContextWithNewWorker] to make sure it's using a
|
||||
|
|
@ -63,50 +62,14 @@ func (o *PlanningOracle) ProviderInstanceConfig(ctx context.Context, addr addrs.
|
|||
return ret
|
||||
}
|
||||
|
||||
// ProviderInstanceUsers returns an object representing which resource instances
|
||||
// are associated with the provider instance that has the given address.
|
||||
//
|
||||
// The planning phase must keep the provider open at least long enough for
|
||||
// all of the reported resource instances to be planned.
|
||||
//
|
||||
// Note that the planning engine will need to plan destruction of any resource
|
||||
// instances that aren't in the desired state once
|
||||
// [ConfigInstance.DrivePlanning] returns, and provider instances involved in
|
||||
// those followup steps will need to remain open until that other work is
|
||||
// done. This package is not concerned with those details; that's the planning
|
||||
// engine's responsibility.
|
||||
func (o *PlanningOracle) ProviderInstanceUsers(ctx context.Context, addr addrs.AbsProviderInstanceCorrect) ProviderInstanceUsers {
|
||||
func (o *PlanningOracle) ProviderInstanceResourceDependencies(ctx context.Context, addr addrs.AbsProviderInstanceCorrect) iter.Seq[*configgraph.ResourceInstance] {
|
||||
ctx = grapheval.ContextWithNewWorker(ctx)
|
||||
_ = ctx // not using this right now, but keeping this to remind future maintainers that we'd need this
|
||||
|
||||
return o.relationships.ProviderInstanceUsers.Get(addr)
|
||||
}
|
||||
|
||||
// EphemeralResourceInstanceUsers returns an object describing which other
|
||||
// resource instances and providers rely on the result value of the ephemeral
|
||||
// resource with the given address.
|
||||
//
|
||||
// The planning phase must keep the ephemeral resource instance open at least
|
||||
// long enough for all of the reported resource instances to be planned and
|
||||
// for all of the reported provider instances to be closed.
|
||||
//
|
||||
// The given address must be for an ephemeral resource instance or this function
|
||||
// will panic.
|
||||
//
|
||||
// Note that the planning engine will need to plan destruction of any resource
|
||||
// instances that aren't in the desired state once
|
||||
// [ConfigInstance.DrivePlanning] returns, and provider instances involved in
|
||||
// those followup steps might be included in a result from this method, in
|
||||
// which case the planning phase must hold the provider instance open long
|
||||
// enough to complete those followup steps.
|
||||
func (o *PlanningOracle) EphemeralResourceInstanceUsers(ctx context.Context, addr addrs.AbsResourceInstance) EphemeralResourceInstanceUsers {
|
||||
ctx = grapheval.ContextWithNewWorker(ctx)
|
||||
_ = ctx // not using this right now, but keeping this to remind future maintainers that we'd need this
|
||||
|
||||
if addr.Resource.Resource.Mode != addrs.EphemeralResourceMode {
|
||||
panic(fmt.Sprintf("EphemeralResourceInstanceUsers with non-ephemeral %s", addr))
|
||||
providerInst := evalglue.ProviderInstance(ctx, o.rootModuleInstance, addr)
|
||||
if providerInst == nil {
|
||||
return nil
|
||||
}
|
||||
return o.relationships.EphemeralResourceUsers.Get(addr)
|
||||
return providerInst.ResourceInstanceDependencies(ctx)
|
||||
}
|
||||
|
||||
func (o *PlanningOracle) EvalContext(ctx context.Context) *EvalContext {
|
||||
|
|
|
|||
|
|
@ -13,123 +13,10 @@ import (
|
|||
"github.com/opentofu/opentofu/internal/addrs"
|
||||
"github.com/opentofu/opentofu/internal/lang/eval/internal/configgraph"
|
||||
"github.com/opentofu/opentofu/internal/lang/eval/internal/evalglue"
|
||||
"github.com/opentofu/opentofu/internal/lang/grapheval"
|
||||
"github.com/opentofu/opentofu/internal/plans/objchange"
|
||||
"github.com/opentofu/opentofu/internal/tfdiags"
|
||||
)
|
||||
|
||||
// prepareToPlan implements an extra preparation phase we perform before
|
||||
// running the main plan phase so we can deal with the "chicken or the egg"
|
||||
// problem of needing to evaluate configuraton to learn the relationships
|
||||
// between resources instances and provider instances but needing to already
|
||||
// know those relationships in order to fully evaluate the configuration.
|
||||
//
|
||||
// As a compromise then, we initially perform a more conservative walk that
|
||||
// just treats all resource instances as having fully-unknown values so that
|
||||
// we don't need to configure any providers or open any ephemeral resource
|
||||
// instances, and then ask the resulting configgraph objects to report
|
||||
// the resource/provider dependencies which we can then use on a subsequent
|
||||
// re-eval that includes the real provider planning actions.
|
||||
//
|
||||
// This approach relies on the idea that an evaluation with more unknown
|
||||
// values should always produce a superset of the true dependencies that
|
||||
// would be reported once there are fewer unknown values, and so the
|
||||
// result of this function should capture _at least_ the dependencies
|
||||
// required to successfully plan, possibly also including some harmless
|
||||
// additional dependency relationships that aren't strictly needed once we can
|
||||
// evaluate using real resource planning results. The planning phase will
|
||||
// then be able to produce its own tighter version of this information to
|
||||
// use when building the execution graph for the apply phase.
|
||||
func (c *ConfigInstance) prepareToPlan(ctx context.Context) (*ResourceRelationships, tfdiags.Diagnostics) {
|
||||
// All of our work will be associated with a workgraph worker that serves
|
||||
// as the initial worker node in the work graph.
|
||||
ctx = grapheval.ContextWithNewWorker(ctx)
|
||||
|
||||
rootModuleInstance, diags := c.precheckedModuleInstance(ctx)
|
||||
if diags.HasErrors() {
|
||||
return nil, diags
|
||||
}
|
||||
ret := &ResourceRelationships{
|
||||
EphemeralResourceUsers: addrs.MakeMap[addrs.AbsResourceInstance, EphemeralResourceInstanceUsers](),
|
||||
ProviderInstanceUsers: addrs.MakeMap[addrs.AbsProviderInstanceCorrect, ProviderInstanceUsers](),
|
||||
}
|
||||
for depender := range evalglue.ResourceInstancesDeep(ctx, rootModuleInstance) {
|
||||
dependerAddr := depender.Addr
|
||||
for dependee := range depender.ResourceInstanceDependencies(ctx) {
|
||||
dependeeAddr := dependee.Addr
|
||||
if dependeeAddr.Resource.Resource.Mode == addrs.EphemeralResourceMode {
|
||||
if !ret.EphemeralResourceUsers.Has(dependeeAddr) {
|
||||
ret.EphemeralResourceUsers.Put(dependeeAddr, EphemeralResourceInstanceUsers{
|
||||
ResourceInstances: addrs.MakeSet[addrs.AbsResourceInstance](),
|
||||
ProviderInstances: addrs.MakeSet[addrs.AbsProviderInstanceCorrect](),
|
||||
})
|
||||
}
|
||||
set := ret.EphemeralResourceUsers.Get(dependeeAddr).ResourceInstances
|
||||
set.Add(dependerAddr)
|
||||
}
|
||||
}
|
||||
providerInst, _, _ := depender.ProviderInstance(ctx)
|
||||
if providerInst, known := configgraph.GetKnown(providerInst); known {
|
||||
providerInstAddr := providerInst.Addr
|
||||
if !ret.ProviderInstanceUsers.Has(providerInstAddr) {
|
||||
ret.ProviderInstanceUsers.Put(providerInstAddr, ProviderInstanceUsers{
|
||||
ResourceInstances: addrs.MakeSet[addrs.AbsResourceInstance](),
|
||||
})
|
||||
}
|
||||
set := ret.ProviderInstanceUsers.Get(providerInstAddr).ResourceInstances
|
||||
set.Add(dependerAddr)
|
||||
}
|
||||
}
|
||||
for depender := range evalglue.ProviderInstancesDeep(ctx, rootModuleInstance) {
|
||||
dependerAddr := depender.Addr
|
||||
for dependee := range depender.ResourceInstanceDependencies(ctx) {
|
||||
dependeeAddr := dependee.Addr
|
||||
if dependeeAddr.Resource.Resource.Mode == addrs.EphemeralResourceMode {
|
||||
if !ret.EphemeralResourceUsers.Has(dependeeAddr) {
|
||||
ret.EphemeralResourceUsers.Put(dependeeAddr, EphemeralResourceInstanceUsers{
|
||||
ResourceInstances: addrs.MakeSet[addrs.AbsResourceInstance](),
|
||||
ProviderInstances: addrs.MakeSet[addrs.AbsProviderInstanceCorrect](),
|
||||
})
|
||||
}
|
||||
set := ret.EphemeralResourceUsers.Get(dependeeAddr).ProviderInstances
|
||||
set.Add(dependerAddr)
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret, diags
|
||||
}
|
||||
|
||||
type ResourceRelationships struct {
|
||||
// EphemeralResourceUsers is a map from ephemeral resource instance
|
||||
// addresses to the sets of addresses of other resource instances (of
|
||||
// any mode, including other ephemeral ones) which depend on them.
|
||||
//
|
||||
// A subsequent plan phase can use this to detect when all of the
|
||||
// downstream users of an ephemeral resource instance have finished
|
||||
// their work and so it's okay to close the ephemeral resource instance.
|
||||
//
|
||||
// TODO: This should also capture the provider instances that are depending
|
||||
// on each ephemeral resource instance.
|
||||
EphemeralResourceUsers addrs.Map[addrs.AbsResourceInstance, EphemeralResourceInstanceUsers]
|
||||
|
||||
// EphemeralResourceUsers is a map from provider instance addresses to the
|
||||
// sets of addresses of resource instances which a provided by them.
|
||||
//
|
||||
// A subsequent plan phase can use this to detect when all of the
|
||||
// downstream users of a provider instance have finished their work and so
|
||||
// it's okay to close the provider instance.
|
||||
ProviderInstanceUsers addrs.Map[addrs.AbsProviderInstanceCorrect, ProviderInstanceUsers]
|
||||
}
|
||||
|
||||
type EphemeralResourceInstanceUsers struct {
|
||||
ResourceInstances addrs.Set[addrs.AbsResourceInstance]
|
||||
ProviderInstances addrs.Set[addrs.AbsProviderInstanceCorrect]
|
||||
}
|
||||
|
||||
type ProviderInstanceUsers struct {
|
||||
ResourceInstances addrs.Set[addrs.AbsResourceInstance]
|
||||
}
|
||||
|
||||
// precheckedModuleInstance deals with the common part of both
|
||||
// [ConfigInstance.prepareToPlan] and [ConfigInstance.Validate], where we
|
||||
// evaluate the configuration using unknown value placeholders for resource
|
||||
|
|
|
|||
|
|
@ -1,391 +0,0 @@
|
|||
// Copyright (c) The OpenTofu Authors
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
// Copyright (c) 2023 HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package eval
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/zclconf/go-cty/cty"
|
||||
|
||||
"github.com/opentofu/opentofu/internal/addrs"
|
||||
"github.com/opentofu/opentofu/internal/configs"
|
||||
"github.com/opentofu/opentofu/internal/configs/configschema"
|
||||
"github.com/opentofu/opentofu/internal/lang/eval/internal/evalglue"
|
||||
"github.com/opentofu/opentofu/internal/providers"
|
||||
)
|
||||
|
||||
// NOTE: Unlike many of the _test.go files in this package, this one is in
|
||||
// "package eval" itself rather than in "package eval_test", because it's
|
||||
// testing the "prepareToPlan" implementation detail that isn't part of the
|
||||
// public API.
|
||||
//
|
||||
// If you bring test code from other files into here then you'll probably
|
||||
// need to remove some "eval." prefixes from references to avoid making this
|
||||
// package import itself.
|
||||
|
||||
func TestPrepare_ephemeralResourceUsers(t *testing.T) {
|
||||
configInst, diags := NewConfigInstance(t.Context(), &ConfigCall{
|
||||
EvalContext: evalglue.EvalContextForTesting(t, &EvalContext{
|
||||
Modules: ModulesForTesting(map[addrs.ModuleSourceLocal]*configs.Module{
|
||||
addrs.ModuleSourceLocal("."): configs.ModuleFromStringForTesting(t, `
|
||||
terraform {
|
||||
required_providers {
|
||||
foo = {
|
||||
source = "test/foo"
|
||||
}
|
||||
}
|
||||
}
|
||||
ephemeral "foo" "a" {
|
||||
count = 2
|
||||
|
||||
name = "a ${count.index}"
|
||||
}
|
||||
ephemeral "foo" "b" {
|
||||
count = 2
|
||||
|
||||
name = ephemeral.foo.a[count.index].id
|
||||
}
|
||||
locals {
|
||||
# This is intentionally a more complex expression
|
||||
# to analyze, to prove that we can still chase the
|
||||
# instance-specific references through it.
|
||||
# This produces a tuple of two-element tuples with the
|
||||
# corresponding ids of ephemeral.foo.a and
|
||||
# ephemeral.foo.b respectively.
|
||||
together = [
|
||||
for i, a in ephemeral.foo.a :
|
||||
[a.id, ephemeral.foo.b[i].id]
|
||||
]
|
||||
}
|
||||
resource "foo" "c" {
|
||||
count = 2
|
||||
|
||||
# Even indirectly through this projection of values
|
||||
# from the two ephemeral resources we should correctly
|
||||
# detect that foo.c instances are correlated with
|
||||
# ephemeral.foo.a and ephemeral.foo.b instances of
|
||||
# the same index.
|
||||
something = local.together[count.index]
|
||||
|
||||
# The above is really just an overly-complicated way of
|
||||
# writing this:
|
||||
#
|
||||
# something = [
|
||||
# ephemeral.foo.a[count.index],
|
||||
# ephemeral.foo.b[count.index],
|
||||
# ]
|
||||
}
|
||||
provider "foo" {
|
||||
alias = "other"
|
||||
|
||||
name = ephemeral.foo.a[0].name
|
||||
}
|
||||
`),
|
||||
}),
|
||||
Providers: ProvidersForTesting(map[addrs.Provider]*providers.GetProviderSchemaResponse{
|
||||
addrs.MustParseProviderSourceString("test/foo"): {
|
||||
Provider: providers.Schema{
|
||||
Block: &configschema.Block{
|
||||
Attributes: map[string]*configschema.Attribute{
|
||||
"name": {
|
||||
Type: cty.String,
|
||||
Optional: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
EphemeralResources: map[string]providers.Schema{
|
||||
"foo": {
|
||||
Block: &configschema.Block{
|
||||
Attributes: map[string]*configschema.Attribute{
|
||||
"name": {
|
||||
Type: cty.String,
|
||||
Required: true,
|
||||
},
|
||||
"id": {
|
||||
Type: cty.String,
|
||||
Computed: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
ResourceTypes: map[string]providers.Schema{
|
||||
"foo": {
|
||||
Block: &configschema.Block{
|
||||
Attributes: map[string]*configschema.Attribute{
|
||||
"something": {
|
||||
Type: cty.List(cty.String),
|
||||
Optional: true,
|
||||
WriteOnly: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
}),
|
||||
RootModuleSource: addrs.ModuleSourceLocal("."),
|
||||
InputValues: InputValuesForTesting(map[string]cty.Value{}),
|
||||
})
|
||||
if diags.HasErrors() {
|
||||
t.Fatalf("unexpected errors: %s", diags.Err())
|
||||
}
|
||||
|
||||
got, diags := configInst.prepareToPlan(t.Context())
|
||||
if diags.HasErrors() {
|
||||
t.Fatalf("unexpected errors: %s", diags.Err())
|
||||
}
|
||||
|
||||
fooA := addrs.Resource{
|
||||
Mode: addrs.EphemeralResourceMode,
|
||||
Type: "foo",
|
||||
Name: "a",
|
||||
}.Absolute(addrs.RootModuleInstance)
|
||||
fooB := addrs.Resource{
|
||||
Mode: addrs.EphemeralResourceMode,
|
||||
Type: "foo",
|
||||
Name: "b",
|
||||
}.Absolute(addrs.RootModuleInstance)
|
||||
fooC := addrs.Resource{
|
||||
Mode: addrs.ManagedResourceMode,
|
||||
Type: "foo",
|
||||
Name: "c",
|
||||
}.Absolute(addrs.RootModuleInstance)
|
||||
inst0 := addrs.IntKey(0)
|
||||
inst1 := addrs.IntKey(1)
|
||||
providerInstAddr := addrs.AbsProviderConfigCorrect{
|
||||
Module: addrs.RootModuleInstance,
|
||||
Config: addrs.ProviderConfigCorrect{
|
||||
Provider: addrs.MustParseProviderSourceString("test/foo"),
|
||||
},
|
||||
}.Instance(addrs.NoKey)
|
||||
providerOtherInstAddr := addrs.AbsProviderConfigCorrect{
|
||||
Module: addrs.RootModuleInstance,
|
||||
Config: addrs.ProviderConfigCorrect{
|
||||
Provider: addrs.MustParseProviderSourceString("test/foo"),
|
||||
Alias: "other",
|
||||
},
|
||||
}.Instance(addrs.NoKey)
|
||||
|
||||
// The analysis should detect that:
|
||||
// - ephemeral.foo.a[0] is used by ephemeral.foo.b[0] and foo.c[0], and by the foo.other provider instance
|
||||
// - ephemeral.foo.a[1] is used by ephemeral.foo.b[1] and foo.c[1]
|
||||
// - ephemeral.foo.b[0] is used by only foo.c[0]
|
||||
// - ephemeral.foo.b[1] is used by only foo.c[1]
|
||||
// In particular, the evaluator should be able to notice that
|
||||
// only the correlated instance keys have any relationship between
|
||||
// them, and so e.g. ephemeral.foo.a[0] is NOT used by ephemeral.foo.b[1].
|
||||
//
|
||||
// This level of precision was not possible in the traditional
|
||||
// "package tofu" language runtime, because it calculated dependencies
|
||||
// based only on static analysis, but this new evaluator uses dynamic
|
||||
// analysis. Refer to [configgraph.ContributingResourceInstances]
|
||||
// to learn more about how that's meant to work, if you're trying to
|
||||
// debug a regression here that made the analysis less precise.
|
||||
want := &ResourceRelationships{
|
||||
// Note that this field captures _inverse_ dependencies: the values
|
||||
// are instances that depend on the keys.
|
||||
//
|
||||
// The best way to understand this is that the ephemeral resource
|
||||
// instance identified in an element's key must remain "open" until all
|
||||
// of the instances identified in the element's value have finished
|
||||
// planning.
|
||||
EphemeralResourceUsers: addrs.MakeMap(
|
||||
addrs.MakeMapElem(fooA.Instance(inst0), EphemeralResourceInstanceUsers{
|
||||
ResourceInstances: addrs.MakeSet(
|
||||
fooB.Instance(inst0),
|
||||
fooC.Instance(inst0),
|
||||
),
|
||||
ProviderInstances: addrs.MakeSet(
|
||||
providerOtherInstAddr,
|
||||
),
|
||||
}),
|
||||
addrs.MakeMapElem(fooA.Instance(inst1), EphemeralResourceInstanceUsers{
|
||||
ResourceInstances: addrs.MakeSet(
|
||||
fooB.Instance(inst1),
|
||||
fooC.Instance(inst1),
|
||||
),
|
||||
ProviderInstances: addrs.MakeSet[addrs.AbsProviderInstanceCorrect](),
|
||||
}),
|
||||
addrs.MakeMapElem(fooB.Instance(inst0), EphemeralResourceInstanceUsers{
|
||||
ResourceInstances: addrs.MakeSet(
|
||||
fooC.Instance(inst0),
|
||||
),
|
||||
ProviderInstances: addrs.MakeSet[addrs.AbsProviderInstanceCorrect](),
|
||||
}),
|
||||
addrs.MakeMapElem(fooB.Instance(inst1), EphemeralResourceInstanceUsers{
|
||||
ResourceInstances: addrs.MakeSet(
|
||||
fooC.Instance(inst1),
|
||||
),
|
||||
ProviderInstances: addrs.MakeSet[addrs.AbsProviderInstanceCorrect](),
|
||||
}),
|
||||
),
|
||||
|
||||
// PrepareToPlan also finds the resources that belong to each
|
||||
// provider instance, which is not the focus of this test but
|
||||
// are part of the result nonetheless.
|
||||
ProviderInstanceUsers: addrs.MakeMap(
|
||||
addrs.MakeMapElem(providerInstAddr, ProviderInstanceUsers{
|
||||
ResourceInstances: addrs.MakeSet(
|
||||
fooA.Instance(inst0),
|
||||
fooA.Instance(inst1),
|
||||
fooB.Instance(inst0),
|
||||
fooB.Instance(inst1),
|
||||
fooC.Instance(inst0),
|
||||
fooC.Instance(inst1),
|
||||
),
|
||||
}),
|
||||
),
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(want, got); diff != "" {
|
||||
t.Error("wrong result\n" + diff)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPrepare_crossModuleReferences(t *testing.T) {
|
||||
configInst, diags := NewConfigInstance(t.Context(), &ConfigCall{
|
||||
EvalContext: evalglue.EvalContextForTesting(t, &EvalContext{
|
||||
Modules: ModulesForTesting(map[addrs.ModuleSourceLocal]*configs.Module{
|
||||
addrs.ModuleSourceLocal("."): configs.ModuleFromStringForTesting(t, `
|
||||
module "a" {
|
||||
source = "./a"
|
||||
}
|
||||
module "b" {
|
||||
source = "./b"
|
||||
|
||||
name = module.a.name
|
||||
}
|
||||
`),
|
||||
addrs.ModuleSourceLocal("./a"): configs.ModuleFromStringForTesting(t, `
|
||||
terraform {
|
||||
required_providers {
|
||||
foo = {
|
||||
source = "test/foo"
|
||||
}
|
||||
}
|
||||
}
|
||||
provider "foo" {}
|
||||
ephemeral "foo" "a" {
|
||||
name = "a"
|
||||
}
|
||||
output "name" {
|
||||
value = ephemeral.foo.a.name
|
||||
}
|
||||
`),
|
||||
addrs.ModuleSourceLocal("./b"): configs.ModuleFromStringForTesting(t, `
|
||||
terraform {
|
||||
required_providers {
|
||||
foo = {
|
||||
source = "test/foo"
|
||||
}
|
||||
}
|
||||
}
|
||||
provider "foo" {}
|
||||
variable "name" {
|
||||
type = string
|
||||
ephemeral = true
|
||||
}
|
||||
resource "foo" "b" {
|
||||
name = var.name
|
||||
}
|
||||
`),
|
||||
}),
|
||||
Providers: ProvidersForTesting(map[addrs.Provider]*providers.GetProviderSchemaResponse{
|
||||
addrs.MustParseProviderSourceString("test/foo"): {
|
||||
Provider: providers.Schema{
|
||||
Block: &configschema.Block{},
|
||||
},
|
||||
EphemeralResources: map[string]providers.Schema{
|
||||
"foo": {
|
||||
Block: &configschema.Block{
|
||||
Attributes: map[string]*configschema.Attribute{
|
||||
"name": {
|
||||
Type: cty.String,
|
||||
Required: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
ResourceTypes: map[string]providers.Schema{
|
||||
"foo": {
|
||||
Block: &configschema.Block{
|
||||
Attributes: map[string]*configschema.Attribute{
|
||||
"name": {
|
||||
Type: cty.String,
|
||||
Optional: true,
|
||||
WriteOnly: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
}),
|
||||
RootModuleSource: addrs.ModuleSourceLocal("."),
|
||||
InputValues: InputValuesForTesting(map[string]cty.Value{}),
|
||||
})
|
||||
if diags.HasErrors() {
|
||||
t.Fatalf("unexpected errors: %s", diags.Err())
|
||||
}
|
||||
|
||||
got, diags := configInst.prepareToPlan(t.Context())
|
||||
if diags.HasErrors() {
|
||||
t.Fatalf("unexpected errors: %s", diags.Err())
|
||||
}
|
||||
|
||||
fooA := addrs.Resource{
|
||||
Mode: addrs.EphemeralResourceMode,
|
||||
Type: "foo",
|
||||
Name: "a",
|
||||
}.Absolute(addrs.RootModuleInstance.Child("a", addrs.NoKey))
|
||||
fooB := addrs.Resource{
|
||||
Mode: addrs.ManagedResourceMode,
|
||||
Type: "foo",
|
||||
Name: "b",
|
||||
}.Absolute(addrs.RootModuleInstance.Child("b", addrs.NoKey))
|
||||
providerInstAddr := addrs.AbsProviderConfigCorrect{
|
||||
Module: addrs.RootModuleInstance,
|
||||
Config: addrs.ProviderConfigCorrect{
|
||||
Provider: addrs.MustParseProviderSourceString("test/foo"),
|
||||
},
|
||||
}.Instance(addrs.NoKey)
|
||||
|
||||
// The analyzer should detect that foo.b in module.b depends on
|
||||
// ephemeral.foo.a in module.a even though they are declared in
|
||||
// different modules.
|
||||
want := &ResourceRelationships{
|
||||
EphemeralResourceUsers: addrs.MakeMap(
|
||||
addrs.MakeMapElem(fooA.Instance(addrs.NoKey), EphemeralResourceInstanceUsers{
|
||||
ResourceInstances: addrs.MakeSet(
|
||||
fooB.Instance(addrs.NoKey),
|
||||
),
|
||||
ProviderInstances: addrs.MakeSet[addrs.AbsProviderInstanceCorrect](),
|
||||
}),
|
||||
),
|
||||
|
||||
// PrepareToPlan also finds the resources that belong to each
|
||||
// provider instance, which is not the focus of this test but
|
||||
// are part of the result nonetheless.
|
||||
ProviderInstanceUsers: addrs.MakeMap(
|
||||
addrs.MakeMapElem(providerInstAddr, ProviderInstanceUsers{
|
||||
ResourceInstances: addrs.MakeSet(
|
||||
fooA.Instance(addrs.NoKey),
|
||||
fooB.Instance(addrs.NoKey),
|
||||
),
|
||||
}),
|
||||
),
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(want, got); diff != "" {
|
||||
t.Error("wrong result\n" + diff)
|
||||
}
|
||||
}
|
||||
234
internal/shared/ephemeral_resource.go
Normal file
234
internal/shared/ephemeral_resource.go
Normal file
|
|
@ -0,0 +1,234 @@
|
|||
// Copyright (c) The OpenTofu Authors
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
// Copyright (c) 2023 HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package shared
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/hcl/v2"
|
||||
"github.com/opentofu/opentofu/internal/addrs"
|
||||
"github.com/opentofu/opentofu/internal/configs/configschema"
|
||||
"github.com/opentofu/opentofu/internal/providers"
|
||||
"github.com/opentofu/opentofu/internal/tfdiags"
|
||||
"github.com/zclconf/go-cty/cty"
|
||||
)
|
||||
|
||||
type EphemeralResourceHooks struct {
|
||||
PreOpen func(addrs.AbsResourceInstance)
|
||||
PostOpen func(addrs.AbsResourceInstance, tfdiags.Diagnostics)
|
||||
PreRenew func(addrs.AbsResourceInstance)
|
||||
PostRenew func(addrs.AbsResourceInstance, tfdiags.Diagnostics)
|
||||
PreClose func(addrs.AbsResourceInstance)
|
||||
PostClose func(addrs.AbsResourceInstance, tfdiags.Diagnostics)
|
||||
}
|
||||
|
||||
type EphemeralCloseFunc func(context.Context) tfdiags.Diagnostics
|
||||
|
||||
func OpenEphemeralResourceInstance(
|
||||
ctx context.Context,
|
||||
// TODO once we remove the old engine, this can be condensed using the new engine types
|
||||
addr addrs.AbsResourceInstance,
|
||||
schema *configschema.Block,
|
||||
providerAddr addrs.AbsProviderInstanceCorrect,
|
||||
provider providers.Interface,
|
||||
configVal cty.Value,
|
||||
hooks EphemeralResourceHooks,
|
||||
) (cty.Value, EphemeralCloseFunc, tfdiags.Diagnostics) {
|
||||
var newVal cty.Value
|
||||
var diags tfdiags.Diagnostics
|
||||
|
||||
// Unmark before sending to provider, will re-mark before returning
|
||||
configVal, pvm := configVal.UnmarkDeepWithPaths()
|
||||
|
||||
log.Printf("[TRACE] OpenEphemeralResourceInstance: Re-validating config for %s", addr)
|
||||
validateResp := provider.ValidateEphemeralConfig(
|
||||
ctx,
|
||||
providers.ValidateEphemeralConfigRequest{
|
||||
TypeName: addr.ContainingResource().Resource.Type,
|
||||
Config: configVal,
|
||||
},
|
||||
)
|
||||
diags = diags.Append(validateResp.Diagnostics)
|
||||
if diags.HasErrors() {
|
||||
return newVal, nil, diags
|
||||
}
|
||||
|
||||
// If we get down here then our configuration is complete and we're ready
|
||||
// to actually call the provider to open the ephemeral resource.
|
||||
log.Printf("[TRACE] OpenEphemeralResourceInstance: %s configuration is complete, so calling the provider", addr)
|
||||
|
||||
if hooks.PreOpen != nil {
|
||||
hooks.PreOpen(addr)
|
||||
}
|
||||
|
||||
openReq := providers.OpenEphemeralResourceRequest{
|
||||
TypeName: addr.ContainingResource().Resource.Type,
|
||||
Config: configVal,
|
||||
}
|
||||
openResp := provider.OpenEphemeralResource(ctx, openReq)
|
||||
diags = diags.Append(openResp.Diagnostics)
|
||||
if diags.HasErrors() {
|
||||
return newVal, nil, diags
|
||||
}
|
||||
|
||||
newVal = openResp.Result
|
||||
|
||||
// Encapsulate validation for easier close handling
|
||||
func() {
|
||||
for _, err := range newVal.Type().TestConformance(schema.ImpliedType()) {
|
||||
diags = diags.Append(tfdiags.Sourceless(
|
||||
tfdiags.Error,
|
||||
"Provider produced invalid object",
|
||||
fmt.Sprintf(
|
||||
"Provider %q produced an invalid value for %s.\n\nThis is a bug in the provider, which should be reported in the provider's own issue tracker.",
|
||||
providerAddr, tfdiags.FormatErrorPrefixed(err, addr.String()),
|
||||
),
|
||||
))
|
||||
}
|
||||
if diags.HasErrors() {
|
||||
return
|
||||
}
|
||||
|
||||
if newVal.IsNull() {
|
||||
diags = diags.Append(tfdiags.Sourceless(
|
||||
tfdiags.Error,
|
||||
"Provider produced null object",
|
||||
fmt.Sprintf(
|
||||
"Provider %q produced a null value for %s.\n\nThis is a bug in the provider, which should be reported in the provider's own issue tracker.",
|
||||
providerAddr, addr,
|
||||
),
|
||||
))
|
||||
return
|
||||
}
|
||||
|
||||
if !newVal.IsNull() && !newVal.IsWhollyKnown() {
|
||||
diags = diags.Append(tfdiags.Sourceless(
|
||||
tfdiags.Error,
|
||||
"Provider produced invalid object",
|
||||
fmt.Sprintf(
|
||||
"Provider %q produced a value for %s that is not wholly known.\n\nThis is a bug in the provider, which should be reported in the provider's own issue tracker.",
|
||||
providerAddr, addr,
|
||||
),
|
||||
))
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
if diags.HasErrors() {
|
||||
// We have an open ephemeral resource, but don't plan to use it due to validation errors
|
||||
// It needs to be closed before we can return
|
||||
|
||||
closReq := providers.CloseEphemeralResourceRequest{
|
||||
TypeName: addr.Resource.Resource.Type,
|
||||
Private: openResp.Private,
|
||||
}
|
||||
closeResp := provider.CloseEphemeralResource(ctx, closReq)
|
||||
diags = diags.Append(closeResp.Diagnostics)
|
||||
|
||||
return newVal, nil, diags
|
||||
}
|
||||
|
||||
// TODO see if this conflicts with anything in the new engine?
|
||||
if len(pvm) > 0 {
|
||||
newVal = newVal.MarkWithPaths(pvm)
|
||||
}
|
||||
|
||||
if hooks.PostOpen != nil {
|
||||
hooks.PostOpen(addr, diags)
|
||||
}
|
||||
|
||||
// Initialize the closing channel and the channel that sends diagnostics back to the close caller.
|
||||
closeCh := make(chan context.Context, 1)
|
||||
diagsCh := make(chan tfdiags.Diagnostics, 1)
|
||||
go func() {
|
||||
var diags tfdiags.Diagnostics
|
||||
renewAt := openResp.RenewAt
|
||||
privateData := openResp.Private
|
||||
|
||||
closeCtx := ctx
|
||||
|
||||
// We have two exit paths that should take the same route
|
||||
func() {
|
||||
for {
|
||||
// Select on nil chan will block until other case close or done
|
||||
var renewAtTimer chan time.Time
|
||||
if renewAt != nil {
|
||||
time.After(time.Until(*renewAt))
|
||||
}
|
||||
|
||||
select {
|
||||
case <-renewAtTimer:
|
||||
if hooks.PreRenew != nil {
|
||||
hooks.PreRenew(addr)
|
||||
}
|
||||
|
||||
renewReq := providers.RenewEphemeralResourceRequest{
|
||||
TypeName: addr.Resource.Resource.Type,
|
||||
Private: privateData,
|
||||
}
|
||||
renewResp := provider.RenewEphemeralResource(ctx, renewReq)
|
||||
diags = diags.Append(renewResp.Diagnostics)
|
||||
// TODO consider what happens if renew fails, do we still want to update private?
|
||||
renewAt = renewResp.RenewAt
|
||||
|
||||
if hooks.PostRenew != nil {
|
||||
hooks.PostRenew(addr, diags)
|
||||
}
|
||||
privateData = renewResp.Private
|
||||
case closeCtx = <-closeCh:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
// Even though the context is "Done" we still want to execute the close operation
|
||||
closeCtx = context.WithoutCancel(closeCtx)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if hooks.PreClose != nil {
|
||||
hooks.PreClose(addr)
|
||||
}
|
||||
|
||||
closReq := providers.CloseEphemeralResourceRequest{
|
||||
TypeName: addr.Resource.Resource.Type,
|
||||
Private: privateData,
|
||||
}
|
||||
closeResp := provider.CloseEphemeralResource(closeCtx, closReq)
|
||||
diags = diags.Append(closeResp.Diagnostics)
|
||||
|
||||
if hooks.PostClose != nil {
|
||||
hooks.PostClose(addr, diags)
|
||||
}
|
||||
|
||||
diagsCh <- diags
|
||||
}()
|
||||
|
||||
closeFunc := func(ctx context.Context) tfdiags.Diagnostics {
|
||||
closeCh <- ctx
|
||||
close(closeCh)
|
||||
defer func() {
|
||||
close(diagsCh)
|
||||
}()
|
||||
|
||||
timeout := 10 * time.Second
|
||||
select {
|
||||
case d := <-diagsCh:
|
||||
return d
|
||||
case <-time.After(timeout):
|
||||
return tfdiags.Diagnostics{}.Append(&hcl.Diagnostic{
|
||||
Severity: hcl.DiagError,
|
||||
Summary: "Closing ephemeral resource timed out",
|
||||
Detail: fmt.Sprintf("The ephemeral resource %q timed out on closing after %s", addr.String(), timeout),
|
||||
//TODO Subject: n.Config.DeclRange.Ptr(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return newVal, closeFunc, diags
|
||||
}
|
||||
|
|
@ -11,7 +11,6 @@ import (
|
|||
"log"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/hcl/v2"
|
||||
"github.com/zclconf/go-cty/cty"
|
||||
|
|
@ -19,7 +18,7 @@ import (
|
|||
|
||||
"github.com/opentofu/opentofu/internal/addrs"
|
||||
"github.com/opentofu/opentofu/internal/checks"
|
||||
"github.com/opentofu/opentofu/internal/communicator/shared"
|
||||
commShared "github.com/opentofu/opentofu/internal/communicator/shared"
|
||||
"github.com/opentofu/opentofu/internal/configs"
|
||||
"github.com/opentofu/opentofu/internal/configs/configschema"
|
||||
"github.com/opentofu/opentofu/internal/encryption"
|
||||
|
|
@ -31,6 +30,7 @@ import (
|
|||
"github.com/opentofu/opentofu/internal/plans/objchange"
|
||||
"github.com/opentofu/opentofu/internal/providers"
|
||||
"github.com/opentofu/opentofu/internal/provisioners"
|
||||
"github.com/opentofu/opentofu/internal/shared"
|
||||
"github.com/opentofu/opentofu/internal/states"
|
||||
"github.com/opentofu/opentofu/internal/tfdiags"
|
||||
)
|
||||
|
|
@ -106,9 +106,8 @@ type NodeAbstractResourceInstance struct {
|
|||
// start so calls to NodeAbstractResourceInstance.Close can return no diagnostics whatsoever.
|
||||
// A common reason for which the renewal goroutine can be skipped from being created is when the ephemeral
|
||||
// resource is deferred for the apply phase.
|
||||
closeCh chan struct{}
|
||||
ephemeralDiags chan tfdiags.Diagnostics
|
||||
renewStarted atomic.Bool
|
||||
ephemeralCloseFn func() tfdiags.Diagnostics
|
||||
renewStarted atomic.Bool
|
||||
}
|
||||
|
||||
// NewNodeAbstractResourceInstance creates an abstract resource instance graph
|
||||
|
|
@ -2056,121 +2055,77 @@ func (n *NodeAbstractResourceInstance) providerMetas(ctx context.Context, evalCt
|
|||
|
||||
func (n *NodeAbstractResourceInstance) openEphemeralResource(ctx context.Context, evalCtx EvalContext, configVal cty.Value) (cty.Value, tfdiags.Diagnostics) {
|
||||
var diags tfdiags.Diagnostics
|
||||
var newVal cty.Value
|
||||
|
||||
config := *n.Config
|
||||
|
||||
provider, providerSchema, err := n.getProvider(ctx, evalCtx)
|
||||
diags = diags.Append(err)
|
||||
if diags.HasErrors() {
|
||||
return newVal, diags
|
||||
return cty.NilVal, diags
|
||||
}
|
||||
schema, _ := providerSchema.SchemaForResourceAddr(n.Addr.ContainingResource().Resource)
|
||||
if schema == nil {
|
||||
// Should be caught during validation, so we don't bother with a pretty error here
|
||||
diags = diags.Append(fmt.Errorf("provider %q does not support ephemeral resource %q", n.ResolvedProvider.ProviderConfig, n.Addr.ContainingResource().Resource.Type))
|
||||
return newVal, diags
|
||||
return cty.NilVal, diags
|
||||
}
|
||||
|
||||
// Unmark before sending to provider, will re-mark before returning
|
||||
var pvm []cty.PathValueMarks
|
||||
configVal, pvm = configVal.UnmarkDeepWithPaths()
|
||||
|
||||
log.Printf("[TRACE] openEphemeralResource: Re-validating config for %s", n.Addr)
|
||||
validateResp := provider.ValidateEphemeralConfig(
|
||||
newVal, closeFn, openDiags := shared.OpenEphemeralResourceInstance(
|
||||
ctx,
|
||||
providers.ValidateEphemeralConfigRequest{
|
||||
TypeName: n.Addr.ContainingResource().Resource.Type,
|
||||
Config: configVal,
|
||||
n.Addr,
|
||||
schema,
|
||||
n.ResolvedProvider.ProviderConfig.Correct().Instance(n.ResolvedProviderKey),
|
||||
provider,
|
||||
configVal,
|
||||
shared.EphemeralResourceHooks{
|
||||
PreOpen: func(addr addrs.AbsResourceInstance) {
|
||||
_ = evalCtx.Hook(func(h Hook) (HookAction, error) {
|
||||
return h.PreOpen(addr)
|
||||
})
|
||||
},
|
||||
PostOpen: func(addr addrs.AbsResourceInstance, diags tfdiags.Diagnostics) {
|
||||
_ = evalCtx.Hook(func(h Hook) (HookAction, error) {
|
||||
return h.PostOpen(addr, diags.Err())
|
||||
})
|
||||
},
|
||||
PreRenew: func(addr addrs.AbsResourceInstance) {
|
||||
_ = evalCtx.Hook(func(h Hook) (HookAction, error) {
|
||||
return h.PreRenew(addr)
|
||||
})
|
||||
},
|
||||
PostRenew: func(addr addrs.AbsResourceInstance, diags tfdiags.Diagnostics) {
|
||||
_ = evalCtx.Hook(func(h Hook) (HookAction, error) {
|
||||
return h.PostRenew(addr, diags.Err())
|
||||
})
|
||||
},
|
||||
PreClose: func(addr addrs.AbsResourceInstance) {
|
||||
_ = evalCtx.Hook(func(h Hook) (HookAction, error) {
|
||||
return h.PreClose(addr)
|
||||
})
|
||||
},
|
||||
PostClose: func(addr addrs.AbsResourceInstance, diags tfdiags.Diagnostics) {
|
||||
_ = evalCtx.Hook(func(h Hook) (HookAction, error) {
|
||||
return h.PostClose(addr, diags.Err())
|
||||
})
|
||||
},
|
||||
},
|
||||
)
|
||||
diags = diags.Append(validateResp.Diagnostics.InConfigBody(config.Config, n.Addr.String()))
|
||||
|
||||
diags = diags.Append(openDiags.InConfigBody(config.Config, n.Addr.String()))
|
||||
if diags.HasErrors() {
|
||||
return newVal, diags
|
||||
}
|
||||
|
||||
// If we get down here then our configuration is complete and we're ready
|
||||
// to actually call the provider to open the ephemeral resource.
|
||||
log.Printf("[TRACE] openEphemeralResource: %s configuration is complete, so calling the provider", n.Addr)
|
||||
|
||||
diags = diags.Append(evalCtx.Hook(func(h Hook) (HookAction, error) {
|
||||
return h.PreOpen(n.Addr)
|
||||
}))
|
||||
if diags.HasErrors() {
|
||||
return newVal, diags
|
||||
n.ephemeralCloseFn = func() tfdiags.Diagnostics {
|
||||
// We use the same context for close here, not sure if we want to consider using the context for the close node instead
|
||||
return closeFn(ctx).InConfigBody(config.Config, n.Addr.String())
|
||||
}
|
||||
|
||||
req := providers.OpenEphemeralResourceRequest{
|
||||
TypeName: n.Addr.ContainingResource().Resource.Type,
|
||||
Config: configVal,
|
||||
}
|
||||
resp := provider.OpenEphemeralResource(ctx, req)
|
||||
diags = diags.Append(resp.Diagnostics.InConfigBody(config.Config, n.Addr.String()))
|
||||
if diags.HasErrors() {
|
||||
return newVal, diags
|
||||
}
|
||||
|
||||
newVal = resp.Result
|
||||
|
||||
for _, err := range newVal.Type().TestConformance(schema.ImpliedType()) {
|
||||
diags = diags.Append(tfdiags.Sourceless(
|
||||
tfdiags.Error,
|
||||
"Provider produced invalid object",
|
||||
fmt.Sprintf(
|
||||
"Provider %q produced an invalid value for %s.\n\nThis is a bug in the provider, which should be reported in the provider's own issue tracker.",
|
||||
n.ResolvedProvider.ProviderConfig.InstanceString(n.ResolvedProviderKey), tfdiags.FormatErrorPrefixed(err, n.Addr.String()),
|
||||
),
|
||||
))
|
||||
}
|
||||
if diags.HasErrors() {
|
||||
return newVal, diags
|
||||
}
|
||||
|
||||
if newVal.IsNull() {
|
||||
diags = diags.Append(tfdiags.Sourceless(
|
||||
tfdiags.Error,
|
||||
"Provider produced null object",
|
||||
fmt.Sprintf(
|
||||
"Provider %q produced a null value for %s.\n\nThis is a bug in the provider, which should be reported in the provider's own issue tracker.",
|
||||
n.ResolvedProvider.ProviderConfig.InstanceString(n.ResolvedProviderKey), n.Addr,
|
||||
),
|
||||
))
|
||||
return newVal, diags
|
||||
}
|
||||
|
||||
if !newVal.IsNull() && !newVal.IsWhollyKnown() {
|
||||
diags = diags.Append(tfdiags.Sourceless(
|
||||
tfdiags.Error,
|
||||
"Provider produced invalid object",
|
||||
fmt.Sprintf(
|
||||
"Provider %q produced a value for %s that is not wholly known.\n\nThis is a bug in the provider, which should be reported in the provider's own issue tracker.",
|
||||
n.ResolvedProvider.ProviderConfig.InstanceString(n.ResolvedProviderKey), n.Addr,
|
||||
),
|
||||
))
|
||||
return newVal, diags
|
||||
}
|
||||
|
||||
if len(pvm) > 0 {
|
||||
newVal = newVal.MarkWithPaths(pvm)
|
||||
}
|
||||
diags = diags.Append(evalCtx.Hook(func(h Hook) (HookAction, error) {
|
||||
return h.PostOpen(n.Addr, diags.Err())
|
||||
}))
|
||||
|
||||
// Initialize the closing channel and the channel that sends diagnostics back to the
|
||||
// NodeAbstractResourceInstance.Close caller.
|
||||
n.closeCh = make(chan struct{}, 1)
|
||||
n.ephemeralDiags = make(chan tfdiags.Diagnostics, 1)
|
||||
// Due to the go scheduler inner works, the goroutine spawned below can be actually scheduled
|
||||
// later than the execution of the nodeCloseableResource graph node.
|
||||
// Therefore, we want to mark the renewal process as started before the goroutine spawning to be sure
|
||||
// that the execution of nodeCloseableResource will block on the diagnostics reported by the
|
||||
// goroutine below.
|
||||
n.renewStarted.Store(true)
|
||||
// The renewer is taking care of calling provider.Renew if resp.RenewAt != nil.
|
||||
// But if resp.RenewAt == nil, renewer holds only the resp.Private that will be used later
|
||||
// when calling provider.CloseEphemeralResource.
|
||||
go n.startEphemeralRenew(ctx, evalCtx, provider, resp.RenewAt, resp.Private)
|
||||
|
||||
return newVal, diags
|
||||
}
|
||||
|
|
@ -2724,11 +2679,11 @@ func (n *NodeAbstractResourceInstance) applyProvisioners(ctx context.Context, ev
|
|||
}
|
||||
|
||||
// start with an empty connInfo
|
||||
connInfo := cty.NullVal(shared.ConnectionBlockSupersetSchema.ImpliedType())
|
||||
connInfo := cty.NullVal(commShared.ConnectionBlockSupersetSchema.ImpliedType())
|
||||
|
||||
if connBody != nil {
|
||||
var connInfoDiags tfdiags.Diagnostics
|
||||
connInfo, connInfoDiags = evalScope(ctx, evalCtx, connBody, self, shared.ConnectionBlockSupersetSchema)
|
||||
connInfo, connInfoDiags = evalScope(ctx, evalCtx, connBody, self, commShared.ConnectionBlockSupersetSchema)
|
||||
diags = diags.Append(connInfoDiags)
|
||||
if diags.HasErrors() {
|
||||
return diags
|
||||
|
|
@ -3427,74 +3382,6 @@ func (n *NodeAbstractResourceInstance) planEphemeralResource(ctx context.Context
|
|||
return plannedChange, plannedNewState, keyData, diags
|
||||
}
|
||||
|
||||
func (n *NodeAbstractResourceInstance) startEphemeralRenew(ctx context.Context, evalContext EvalContext, provider providers.Interface, renewAt *time.Time, privateData []byte) {
|
||||
if n.Addr.Resource.Resource.Mode != addrs.EphemeralResourceMode {
|
||||
panic("renewal process cannot be started for resources other than ephemeral ones. This is an OpenTofu issue, please report it")
|
||||
}
|
||||
privateData, diags := n.renewEphemeral(ctx, evalContext, provider, renewAt, privateData)
|
||||
// wait for the close signal. This is like this because the renewEphemeral can return right away if the renewAt is nil.
|
||||
// But the close of the ephemeral should happen only when the graph walk is reaching the execution of the closing
|
||||
// ephemeral resource node.
|
||||
<-n.closeCh
|
||||
diags = diags.Append(n.closeEphemeralResource(ctx, evalContext, provider, privateData))
|
||||
n.ephemeralDiags <- diags
|
||||
}
|
||||
|
||||
func (n *NodeAbstractResourceInstance) closeEphemeralResource(ctx context.Context, evalContext EvalContext, provider providers.Interface, privateData []byte) (diags tfdiags.Diagnostics) {
|
||||
req := providers.CloseEphemeralResourceRequest{
|
||||
TypeName: n.Addr.Resource.Resource.Type,
|
||||
Private: privateData,
|
||||
}
|
||||
|
||||
// We are using cty.EmptyObject for the PreApply and PostApply because the prior state
|
||||
// and the new planned state does not matter in ephemeral resources context, especially
|
||||
// in the context of the close operation.
|
||||
diags = diags.Append(evalContext.Hook(func(h Hook) (HookAction, error) {
|
||||
return h.PreClose(n.Addr)
|
||||
}))
|
||||
resp := provider.CloseEphemeralResource(ctx, req)
|
||||
diags = diags.Append(resp.Diagnostics)
|
||||
|
||||
diags = diags.Append(evalContext.Hook(func(h Hook) (HookAction, error) {
|
||||
return h.PostClose(n.Addr, diags.Err())
|
||||
}))
|
||||
return diags.Append(diags)
|
||||
}
|
||||
|
||||
// renewEphemeral is meant to be called into a goroutine. This method listens on ctx.Done and n.closeCh for ending the job and
|
||||
// to return the data.
|
||||
func (n *NodeAbstractResourceInstance) renewEphemeral(ctx context.Context, evalContext EvalContext, provider providers.Interface, renewAt *time.Time, privateData []byte) ([]byte, tfdiags.Diagnostics) {
|
||||
var diags tfdiags.Diagnostics
|
||||
for {
|
||||
if renewAt == nil {
|
||||
return privateData, diags
|
||||
}
|
||||
select {
|
||||
case <-time.After(time.Until(*renewAt)):
|
||||
case <-n.closeCh:
|
||||
return privateData, diags
|
||||
case <-ctx.Done():
|
||||
return privateData, diags
|
||||
}
|
||||
diags = diags.Append(evalContext.Hook(func(h Hook) (HookAction, error) {
|
||||
// We are using cty.EmptyObject here because the prior state and the new planned state does not matter
|
||||
// in ephemeral resources context, especially in the context of the renew operation.
|
||||
return h.PreRenew(n.Addr)
|
||||
}))
|
||||
req := providers.RenewEphemeralResourceRequest{
|
||||
TypeName: n.Addr.Resource.Resource.Type,
|
||||
Private: privateData,
|
||||
}
|
||||
resp := provider.RenewEphemeralResource(ctx, req)
|
||||
diags = diags.Append(evalContext.Hook(func(h Hook) (HookAction, error) {
|
||||
return h.PostRenew(n.Addr, diags.Err())
|
||||
}))
|
||||
diags = diags.Append(resp.Diagnostics)
|
||||
renewAt = resp.RenewAt
|
||||
privateData = resp.Private
|
||||
}
|
||||
}
|
||||
|
||||
// deferEphemeralResource is a helper function that builds a change and a state object by using a
|
||||
// partial value and is announcing the deferral of the ephemeral resource.
|
||||
func (n *NodeAbstractResourceInstance) deferEphemeralResource(evalCtx EvalContext, schema *configschema.Block, priorVal cty.Value, configVal cty.Value, reason string) (
|
||||
|
|
@ -3544,21 +3431,7 @@ func (n *NodeAbstractResourceInstance) Close() tfdiags.Diagnostics {
|
|||
// If the ephemeral resource has been deferred, this method needs to return immediately.
|
||||
return nil
|
||||
}
|
||||
defer func() {
|
||||
close(n.ephemeralDiags)
|
||||
n.renewStarted.Store(false)
|
||||
}()
|
||||
close(n.closeCh)
|
||||
timeout := 10 * time.Second
|
||||
select {
|
||||
case d := <-n.ephemeralDiags:
|
||||
return d
|
||||
case <-time.After(timeout):
|
||||
return tfdiags.Diagnostics{}.Append(&hcl.Diagnostic{
|
||||
Severity: hcl.DiagError,
|
||||
Summary: "Closing ephemeral resource timed out",
|
||||
Detail: fmt.Sprintf("The ephemeral resource %q timed out on closing after %s", n.Addr.String(), timeout),
|
||||
Subject: n.Config.DeclRange.Ptr(),
|
||||
})
|
||||
}
|
||||
defer n.renewStarted.Store(false)
|
||||
|
||||
return n.ephemeralCloseFn()
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue