From 8ce780b4e08979a2fe9e07adc9d8c842c303be26 Mon Sep 17 00:00:00 2001 From: Christian Mesh Date: Wed, 4 Feb 2026 16:44:20 -0500 Subject: [PATCH] engine/ephemeral: Wire together basic ephemeral functionality (#3710) Signed-off-by: Christian Mesh --- .../applying/operations_resource_ephemeral.go | 63 ++- internal/engine/internal/exec/ephemeral.go | 17 + internal/engine/internal/exec/operations.go | 19 +- internal/engine/internal/execgraph/builder.go | 18 +- .../engine/internal/execgraph/compiler.go | 2 + .../engine/internal/execgraph/compiler_ops.go | 45 +- .../internal/execgraph/graph_unmarshal.go | 25 +- .../internal/execgraph/opcode_string.go | 7 +- .../engine/internal/execgraph/operation.go | 1 + .../internal/execgraph/operations_test.go | 26 +- .../engine/lifecycle/completion_tracker.go | 149 ------- .../lifecycle/completion_tracker_test.go | 87 ---- .../engine/planning/completion_tracker.go | 82 ---- internal/engine/planning/execgraph.go | 4 +- .../engine/planning/execgraph_ephemeral.go | 59 +++ internal/engine/planning/execgraph_managed.go | 15 +- .../engine/planning/execgraph_provider.go | 23 +- .../engine/planning/execgraph_resource.go | 16 +- internal/engine/planning/plan.go | 11 +- internal/engine/planning/plan_context.go | 48 ++- internal/engine/planning/plan_data.go | 4 - internal/engine/planning/plan_ephemeral.go | 56 ++- internal/engine/planning/plan_eval_glue.go | 43 -- internal/engine/planning/plan_managed.go | 11 +- internal/engine/planning/providers.go | 49 +-- internal/engine/plugins/plugins.go | 6 +- internal/lang/eval/config_plan.go | 8 +- internal/lang/eval/config_plan_oracle.go | 51 +-- internal/lang/eval/config_prepare.go | 113 ----- internal/lang/eval/config_prepare_test.go | 391 ------------------ internal/shared/ephemeral_resource.go | 234 +++++++++++ .../tofu/node_resource_abstract_instance.go | 233 +++-------- 32 files changed, 676 insertions(+), 1240 deletions(-) create mode 100644 internal/engine/internal/exec/ephemeral.go delete mode 100644 internal/engine/lifecycle/completion_tracker.go delete mode 100644 internal/engine/lifecycle/completion_tracker_test.go delete mode 100644 internal/engine/planning/completion_tracker.go create mode 100644 internal/engine/planning/execgraph_ephemeral.go delete mode 100644 internal/lang/eval/config_prepare_test.go create mode 100644 internal/shared/ephemeral_resource.go diff --git a/internal/engine/applying/operations_resource_ephemeral.go b/internal/engine/applying/operations_resource_ephemeral.go index 51fe54301a..888cf9c6f8 100644 --- a/internal/engine/applying/operations_resource_ephemeral.go +++ b/internal/engine/applying/operations_resource_ephemeral.go @@ -7,29 +7,78 @@ package applying import ( "context" + "fmt" "log" "github.com/opentofu/opentofu/internal/engine/internal/exec" "github.com/opentofu/opentofu/internal/lang/eval" + "github.com/opentofu/opentofu/internal/shared" + "github.com/opentofu/opentofu/internal/states" "github.com/opentofu/opentofu/internal/tfdiags" ) // EphemeralOpen implements [exec.Operations]. func (ops *execOperations) EphemeralOpen( ctx context.Context, - desired *eval.DesiredResourceInstance, + inst *eval.DesiredResourceInstance, providerClient *exec.ProviderClient, +) (*exec.OpenEphemeralResourceInstance, tfdiags.Diagnostics) { + log.Printf("[TRACE] apply phase: EphemeralOpen %s using %s", inst.Addr, providerClient.InstanceAddr) + + var diags tfdiags.Diagnostics + + schema, _ := ops.plugins.ResourceTypeSchema(ctx, inst.Provider, inst.Addr.Resource.Resource.Mode, inst.Addr.Resource.Resource.Type) + if schema == nil || schema.Block == nil { + // Should be caught during validation, so we don't bother with a pretty error here + diags = diags.Append(fmt.Errorf("provider %q does not support ephemeral resource %q", inst.ProviderInstance, inst.Addr.Resource.Resource.Type)) + return nil, diags + } + + newVal, closeFunc, openDiags := shared.OpenEphemeralResourceInstance( + ctx, + inst.Addr, + schema.Block, + *inst.ProviderInstance, + providerClient.Ops, + inst.ConfigVal, + shared.EphemeralResourceHooks{}, + ) + diags = diags.Append(openDiags) + if openDiags.HasErrors() { + return nil, diags + } + + state := &exec.ResourceInstanceObject{ + InstanceAddr: inst.Addr, + State: &states.ResourceInstanceObjectFull{ + Status: states.ObjectReady, + Value: newVal, + // TODO Not sure these fields are needed + ResourceType: inst.Addr.Resource.Resource.Type, + ProviderInstanceAddr: providerClient.InstanceAddr, + //SchemaVersion: uint64(schema.Version), + //Private: resp.Private, + }, + } + + return &exec.OpenEphemeralResourceInstance{ + State: state, + Close: closeFunc, + }, diags +} + +// EphemeralState implements [exec.Operations] +func (ops *execOperations) EphemeralState( + ctx context.Context, + ephemeralInst *exec.OpenEphemeralResourceInstance, ) (*exec.ResourceInstanceObject, tfdiags.Diagnostics) { - log.Printf("[TRACE] apply phase: EphemeralOpen %s using %s", desired.Addr, providerClient.InstanceAddr) - panic("unimplemented") + return ephemeralInst.State, nil } // EphemeralClose implements [exec.Operations]. func (ops *execOperations) EphemeralClose( ctx context.Context, - object *exec.ResourceInstanceObject, - providerClient *exec.ProviderClient, + ephemeralInst *exec.OpenEphemeralResourceInstance, ) tfdiags.Diagnostics { - log.Printf("[TRACE] apply phase: EphemeralClose %s using %s", object.InstanceAddr, providerClient.InstanceAddr) - panic("unimplemented") + return ephemeralInst.Close(ctx) } diff --git a/internal/engine/internal/exec/ephemeral.go b/internal/engine/internal/exec/ephemeral.go new file mode 100644 index 0000000000..7e744963d7 --- /dev/null +++ b/internal/engine/internal/exec/ephemeral.go @@ -0,0 +1,17 @@ +// Copyright (c) The OpenTofu Authors +// SPDX-License-Identifier: MPL-2.0 +// Copyright (c) 2023 HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package exec + +import ( + "context" + + "github.com/opentofu/opentofu/internal/tfdiags" +) + +type OpenEphemeralResourceInstance struct { + State *ResourceInstanceObject + Close func(context.Context) tfdiags.Diagnostics +} diff --git a/internal/engine/internal/exec/operations.go b/internal/engine/internal/exec/operations.go index cf6c7e5fa6..d5c6188723 100644 --- a/internal/engine/internal/exec/operations.go +++ b/internal/engine/internal/exec/operations.go @@ -271,19 +271,25 @@ type Operations interface { // detail of whatever is managing a provider's operation, with the execution // graph just assuming that ephemeral objects remain valid _somehow_ for // the full duration of their use. + EphemeralOpen( + ctx context.Context, + desired *eval.DesiredResourceInstance, + providerClient *ProviderClient, + ) (*OpenEphemeralResourceInstance, tfdiags.Diagnostics) + + // EphemeralState refines the open ephemeral resource instance into the + // required resource object state // // Execution graph processing automatically passes the result of this // function to [Operations.ResourceInstancePostconditions] when appropriate, // propagating any additional diagnostics it returns, and so implementers of // this method should not attempt to handle postconditions themselves. - EphemeralOpen( + EphemeralState( ctx context.Context, - desired *eval.DesiredResourceInstance, - providerClient *ProviderClient, + ephemeral *OpenEphemeralResourceInstance, ) (*ResourceInstanceObject, tfdiags.Diagnostics) - // EphemeralClose uses the given provider client to "close" the given - // ephemeral object. + // EphemeralClose calls Close on the open ephemeral resource instance // // A valid execution graph ensures that this is called only after all other // operations using the given object have either completed or have been @@ -292,7 +298,6 @@ type Operations interface { // after this method returns. EphemeralClose( ctx context.Context, - object *ResourceInstanceObject, - providerClient *ProviderClient, + ephemeral *OpenEphemeralResourceInstance, ) tfdiags.Diagnostics } diff --git a/internal/engine/internal/execgraph/builder.go b/internal/engine/internal/execgraph/builder.go index c764fadc89..34e8f0e70a 100644 --- a/internal/engine/internal/execgraph/builder.go +++ b/internal/engine/internal/execgraph/builder.go @@ -214,22 +214,30 @@ func (b *Builder) DataRead( func (b *Builder) EphemeralOpen( desiredInst ResultRef[*eval.DesiredResourceInstance], providerClient ResultRef[*exec.ProviderClient], -) ResourceInstanceResultRef { - return operationRef[*exec.ResourceInstanceObject](b, operationDesc{ +) ResultRef[*exec.OpenEphemeralResourceInstance] { + return operationRef[*exec.OpenEphemeralResourceInstance](b, operationDesc{ opCode: opEphemeralOpen, operands: []AnyResultRef{desiredInst, providerClient}, }) } +func (b *Builder) EphemeralState( + ephemeralInst ResultRef[*exec.OpenEphemeralResourceInstance], +) ResourceInstanceResultRef { + return operationRef[*exec.ResourceInstanceObject](b, operationDesc{ + opCode: opEphemeralState, + operands: []AnyResultRef{ephemeralInst}, + }) +} + func (b *Builder) EphemeralClose( - obj ResourceInstanceResultRef, - providerClient ResultRef[*exec.ProviderClient], + ephemeralInst ResultRef[*exec.OpenEphemeralResourceInstance], waitFor AnyResultRef, ) ResultRef[struct{}] { waiter := b.ensureWaiterRef(waitFor) return operationRef[struct{}](b, operationDesc{ opCode: opEphemeralClose, - operands: []AnyResultRef{obj, providerClient, waiter}, + operands: []AnyResultRef{ephemeralInst, waiter}, }) } diff --git a/internal/engine/internal/execgraph/compiler.go b/internal/engine/internal/execgraph/compiler.go index ae10f12136..03b827cd44 100644 --- a/internal/engine/internal/execgraph/compiler.go +++ b/internal/engine/internal/execgraph/compiler.go @@ -123,6 +123,8 @@ func (c *compiler) Compile() (*CompiledGraph, tfdiags.Diagnostics) { compileFunc = c.compileOpDataRead case opEphemeralOpen: compileFunc = c.compileOpEphemeralOpen + case opEphemeralState: + compileFunc = c.compileOpEphemeralState case opEphemeralClose: compileFunc = c.compileOpEphemeralClose default: diff --git a/internal/engine/internal/execgraph/compiler_ops.go b/internal/engine/internal/execgraph/compiler_ops.go index dd6419b06f..09a4e3b61d 100644 --- a/internal/engine/internal/execgraph/compiler_ops.go +++ b/internal/engine/internal/execgraph/compiler_ops.go @@ -339,15 +339,12 @@ func (c *compiler) compileOpEphemeralOpen(operands *compilerOperands) nodeExecut ret, moreDiags := ops.EphemeralOpen(ctx, desired, providerClient) diags = diags.Append(moreDiags) - // TODO: Also call ops.ResourceInstancePostconditions - log.Printf("[WARN] opEphemeralOpen doesn't yet handle postconditions") return ret, !diags.HasErrors(), diags } } -func (c *compiler) compileOpEphemeralClose(operands *compilerOperands) nodeExecuteRaw { - getObject := nextOperand[*exec.ResourceInstanceObject](operands) - getProviderClient := nextOperand[*exec.ProviderClient](operands) +func (c *compiler) compileOpEphemeralState(operands *compilerOperands) nodeExecuteRaw { + getEphemeral := nextOperand[*exec.OpenEphemeralResourceInstance](operands) diags := operands.Finish() c.diags = c.diags.Append(diags) if diags.HasErrors() { @@ -356,18 +353,42 @@ func (c *compiler) compileOpEphemeralClose(operands *compilerOperands) nodeExecu ops := c.ops return func(ctx context.Context) (any, bool, tfdiags.Diagnostics) { - providerClient, ok, moreDiags := getProviderClient(ctx) - diags = diags.Append(moreDiags) - if !ok { - return nil, false, diags - } - object, ok, moreDiags := getObject(ctx) + ephemeral, ok, moreDiags := getEphemeral(ctx) diags = diags.Append(moreDiags) if !ok { return nil, false, diags } - moreDiags = ops.EphemeralClose(ctx, object, providerClient) + ret, moreDiags := ops.EphemeralState(ctx, ephemeral) + diags = diags.Append(moreDiags) + // TODO: Also call ops.ResourceInstancePostconditions + log.Printf("[WARN] opEphemeralState doesn't yet handle postconditions") + return ret, !diags.HasErrors(), diags + } +} + +func (c *compiler) compileOpEphemeralClose(operands *compilerOperands) nodeExecuteRaw { + getEphemeral := nextOperand[*exec.OpenEphemeralResourceInstance](operands) + waitForUsers := operands.OperandWaiter() + diags := operands.Finish() + c.diags = c.diags.Append(diags) + if diags.HasErrors() { + return nil + } + ops := c.ops + + return func(ctx context.Context) (any, bool, tfdiags.Diagnostics) { + // We intentionally ignore results here because we want to close the + // ephemeral even if one of its users fails. + waitForUsers(ctx) + + ephemeral, ok, moreDiags := getEphemeral(ctx) + diags = diags.Append(moreDiags) + if !ok { + return nil, false, diags + } + + moreDiags = ops.EphemeralClose(ctx, ephemeral) diags = diags.Append(moreDiags) return struct{}{}, !diags.HasErrors(), diags } diff --git a/internal/engine/internal/execgraph/graph_unmarshal.go b/internal/engine/internal/execgraph/graph_unmarshal.go index 4c204acfc8..3eee929708 100644 --- a/internal/engine/internal/execgraph/graph_unmarshal.go +++ b/internal/engine/internal/execgraph/graph_unmarshal.go @@ -135,6 +135,8 @@ func unmarshalOperationElem(protoOp *execgraphproto.Operation, prevResults []Any return unmarshalOpDataRead(protoOp.GetOperands(), prevResults, builder) case opEphemeralOpen: return unmarshalOpEphemeralOpen(protoOp.GetOperands(), prevResults, builder) + case opEphemeralState: + return unmarshalOpEphemeralState(protoOp.GetOperands(), prevResults, builder) case opEphemeralClose: return unmarshalOpEphemeralClose(protoOp.GetOperands(), prevResults, builder) default: @@ -310,23 +312,30 @@ func unmarshalOpEphemeralOpen(rawOperands []uint64, prevResults []AnyResultRef, return builder.EphemeralOpen(desiredInst, providerClient), nil } -func unmarshalOpEphemeralClose(rawOperands []uint64, prevResults []AnyResultRef, builder *Builder) (AnyResultRef, error) { - if len(rawOperands) != 3 { +func unmarshalOpEphemeralState(rawOperands []uint64, prevResults []AnyResultRef, builder *Builder) (AnyResultRef, error) { + if len(rawOperands) != 1 { return nil, fmt.Errorf("wrong number of operands (%d) for opDataRead", len(rawOperands)) } - obj, err := unmarshalGetPrevResultOf[*exec.ResourceInstanceObject](prevResults, rawOperands[0]) + ephemeralInst, err := unmarshalGetPrevResultOf[*exec.OpenEphemeralResourceInstance](prevResults, rawOperands[0]) if err != nil { return nil, fmt.Errorf("invalid opDataRead desiredInst: %w", err) } - providerClient, err := unmarshalGetPrevResultOf[*exec.ProviderClient](prevResults, rawOperands[1]) - if err != nil { - return nil, fmt.Errorf("invalid opDataRead providerClient: %w", err) + return builder.EphemeralState(ephemeralInst), nil +} + +func unmarshalOpEphemeralClose(rawOperands []uint64, prevResults []AnyResultRef, builder *Builder) (AnyResultRef, error) { + if len(rawOperands) != 2 { + return nil, fmt.Errorf("wrong number of operands (%d) for opDataRead", len(rawOperands)) } - waitFor, err := unmarshalGetPrevResultWaiter(prevResults, rawOperands[2]) + ephemeralInst, err := unmarshalGetPrevResultOf[*exec.OpenEphemeralResourceInstance](prevResults, rawOperands[0]) + if err != nil { + return nil, fmt.Errorf("invalid opDataRead desiredInst: %w", err) + } + waitFor, err := unmarshalGetPrevResultWaiter(prevResults, rawOperands[1]) if err != nil { return nil, fmt.Errorf("invalid opDataRead waitFor: %w", err) } - return builder.EphemeralClose(obj, providerClient, waitFor), nil + return builder.EphemeralClose(ephemeralInst, waitFor), nil } func unmarshalWaiterElem(protoWaiter *execgraphproto.Waiter, prevResults []AnyResultRef, builder *Builder) (AnyResultRef, error) { diff --git a/internal/engine/internal/execgraph/opcode_string.go b/internal/engine/internal/execgraph/opcode_string.go index 544e8da7a3..d803ac78f7 100644 --- a/internal/engine/internal/execgraph/opcode_string.go +++ b/internal/engine/internal/execgraph/opcode_string.go @@ -19,12 +19,13 @@ func _() { _ = x[opManagedAlreadyDeposed-9] _ = x[opDataRead-10] _ = x[opEphemeralOpen-11] - _ = x[opEphemeralClose-12] + _ = x[opEphemeralState-12] + _ = x[opEphemeralClose-13] } -const _opCode_name = "opProviderInstanceConfigopProviderInstanceOpenopProviderInstanceCloseopResourceInstanceDesiredopResourceInstancePrioropManagedFinalPlanopManagedApplyopManagedDeposeopManagedAlreadyDeposedopDataReadopEphemeralOpenopEphemeralClose" +const _opCode_name = "opProviderInstanceConfigopProviderInstanceOpenopProviderInstanceCloseopResourceInstanceDesiredopResourceInstancePrioropManagedFinalPlanopManagedApplyopManagedDeposeopManagedAlreadyDeposedopDataReadopEphemeralOpenopEphemeralStateopEphemeralClose" -var _opCode_index = [...]uint8{0, 24, 46, 69, 94, 117, 135, 149, 164, 187, 197, 212, 228} +var _opCode_index = [...]uint8{0, 24, 46, 69, 94, 117, 135, 149, 164, 187, 197, 212, 228, 244} func (i opCode) String() string { idx := int(i) - 1 diff --git a/internal/engine/internal/execgraph/operation.go b/internal/engine/internal/execgraph/operation.go index 8260dadfbc..6b796a49dc 100644 --- a/internal/engine/internal/execgraph/operation.go +++ b/internal/engine/internal/execgraph/operation.go @@ -40,6 +40,7 @@ const ( opDataRead opEphemeralOpen + opEphemeralState opEphemeralClose ) diff --git a/internal/engine/internal/execgraph/operations_test.go b/internal/engine/internal/execgraph/operations_test.go index d37b64e0c4..0b34d8e632 100644 --- a/internal/engine/internal/execgraph/operations_test.go +++ b/internal/engine/internal/execgraph/operations_test.go @@ -23,8 +23,9 @@ type mockOperations struct { Calls []mockOperationsCall DataReadFunc func(ctx context.Context, desired *eval.DesiredResourceInstance, plannedVal cty.Value, providerClient *exec.ProviderClient) (*exec.ResourceInstanceObject, tfdiags.Diagnostics) - EphemeralCloseFunc func(ctx context.Context, object *exec.ResourceInstanceObject, providerClient *exec.ProviderClient) tfdiags.Diagnostics - EphemeralOpenFunc func(ctx context.Context, desired *eval.DesiredResourceInstance, providerClient *exec.ProviderClient) (*exec.ResourceInstanceObject, tfdiags.Diagnostics) + EphemeralCloseFunc func(ctx context.Context, ephemeral *exec.OpenEphemeralResourceInstance) tfdiags.Diagnostics + EphemeralOpenFunc func(ctx context.Context, desired *eval.DesiredResourceInstance, providerClient *exec.ProviderClient) (*exec.OpenEphemeralResourceInstance, tfdiags.Diagnostics) + EphemeralStateFunc func(ctx context.Context, ephemeral *exec.OpenEphemeralResourceInstance) (*exec.ResourceInstanceObject, tfdiags.Diagnostics) ManagedAlreadyDeposedFunc func(ctx context.Context, instAddr addrs.AbsResourceInstance, deposedKey states.DeposedKey) (*exec.ResourceInstanceObject, tfdiags.Diagnostics) ManagedApplyFunc func(ctx context.Context, plan *exec.ManagedResourceObjectFinalPlan, fallback *exec.ResourceInstanceObject, providerClient *exec.ProviderClient) (*exec.ResourceInstanceObject, tfdiags.Diagnostics) ManagedDeposeFunc func(ctx context.Context, instAddr addrs.AbsResourceInstance) (*exec.ResourceInstanceObject, tfdiags.Diagnostics) @@ -53,19 +54,19 @@ func (m *mockOperations) DataRead(ctx context.Context, desired *eval.DesiredReso } // EphemeralClose implements [exec.Operations]. -func (m *mockOperations) EphemeralClose(ctx context.Context, object *exec.ResourceInstanceObject, providerClient *exec.ProviderClient) tfdiags.Diagnostics { +func (m *mockOperations) EphemeralClose(ctx context.Context, ephemeral *exec.OpenEphemeralResourceInstance) tfdiags.Diagnostics { var diags tfdiags.Diagnostics if m.EphemeralCloseFunc != nil { - diags = m.EphemeralCloseFunc(ctx, object, providerClient) + diags = m.EphemeralCloseFunc(ctx, ephemeral) } - m.appendLog("EphemeralClose", []any{object, providerClient}, struct{}{}) + m.appendLog("EphemeralClose", []any{ephemeral}, struct{}{}) return diags } // EphemeralOpen implements [exec.Operations]. -func (m *mockOperations) EphemeralOpen(ctx context.Context, desired *eval.DesiredResourceInstance, providerClient *exec.ProviderClient) (*exec.ResourceInstanceObject, tfdiags.Diagnostics) { +func (m *mockOperations) EphemeralOpen(ctx context.Context, desired *eval.DesiredResourceInstance, providerClient *exec.ProviderClient) (*exec.OpenEphemeralResourceInstance, tfdiags.Diagnostics) { var diags tfdiags.Diagnostics - var result *exec.ResourceInstanceObject + var result *exec.OpenEphemeralResourceInstance if m.EphemeralOpenFunc != nil { result, diags = m.EphemeralOpenFunc(ctx, desired, providerClient) } @@ -73,6 +74,17 @@ func (m *mockOperations) EphemeralOpen(ctx context.Context, desired *eval.Desire return result, diags } +// EphemeralState implements [exec.Operations]. +func (m *mockOperations) EphemeralState(ctx context.Context, ephemeral *exec.OpenEphemeralResourceInstance) (*exec.ResourceInstanceObject, tfdiags.Diagnostics) { + var diags tfdiags.Diagnostics + var result *exec.ResourceInstanceObject + if m.EphemeralStateFunc != nil { + result, diags = m.EphemeralStateFunc(ctx, ephemeral) + } + m.appendLog("EphemeralState", []any{ephemeral}, result) + return result, diags +} + // ManagedAlreadyDeposed implements [exec.Operations]. func (m *mockOperations) ManagedAlreadyDeposed(ctx context.Context, instAddr addrs.AbsResourceInstance, deposedKey states.DeposedKey) (*exec.ResourceInstanceObject, tfdiags.Diagnostics) { var diags tfdiags.Diagnostics diff --git a/internal/engine/lifecycle/completion_tracker.go b/internal/engine/lifecycle/completion_tracker.go deleted file mode 100644 index 0da94669b2..0000000000 --- a/internal/engine/lifecycle/completion_tracker.go +++ /dev/null @@ -1,149 +0,0 @@ -// Copyright (c) The OpenTofu Authors -// SPDX-License-Identifier: MPL-2.0 -// Copyright (c) 2023 HashiCorp, Inc. -// SPDX-License-Identifier: MPL-2.0 - -package lifecycle - -import ( - "iter" - "maps" - "sync" - - "github.com/opentofu/opentofu/internal/collections" -) - -// CompletionTracker is a synchronization utility that keeps a record of the -// completion of various items and allows various different goroutines to wait -// for the completion of different subsets of the items. -// -// "Items" can be of any comparable type, but the design intention is that a -// caller will define its own types to represent the different kinds of work -// it needs to track. -type CompletionTracker[T comparable] struct { - mu sync.Mutex - completed collections.Set[T] - waiters collections.Set[*completionWaiter[T]] -} - -type completionWaiter[T comparable] struct { - pending collections.Set[T] - ch chan<- struct{} -} - -// NewCompletionTracker returns a new [CompletionTracker] that initially -// has no waiters and no completed items. -func NewCompletionTracker[T comparable]() *CompletionTracker[T] { - return &CompletionTracker[T]{ - completed: collections.NewSet[T](), - waiters: collections.NewSet[*completionWaiter[T]](), - } -} - -// ItemComplete returns true if the given item has already been reported -// as complete using [CompletionTracker.ReportCompletion]. -// -// A complete item can never become incomplete again, but if this function -// returns false then a concurrent goroutine could potentially report the -// item as complete before the caller acts on that result. -func (t *CompletionTracker[T]) ItemComplete(item T) bool { - t.mu.Lock() - _, ret := t.completed[item] - t.mu.Unlock() - return ret -} - -// NewWaiterFor returns an unbuffered channel that will be closed once all -// of the addresses in the given seqence have had their completion reported -// using [CompletionTracker.ReportCompletion]. -// -// No items will be sent to the channel. -// -// For callers that would just immediately block waiting for the given channel -// to be closed (without using it as part of a larger "select" statement), -// consider using the simpler [CompletionTracker.WaitFor] instead. -func (t *CompletionTracker[T]) NewWaiterFor(waitFor iter.Seq[T]) <-chan struct{} { - t.mu.Lock() - defer t.mu.Unlock() - - ch := make(chan struct{}) - waiter := &completionWaiter[T]{ - pending: collections.NewSet[T](), - ch: ch, - } - for item := range waitFor { - if t.completed.Has(item) { - continue // ignore any already-completed items - } - waiter.pending[item] = struct{}{} - } - - if len(waiter.pending) == 0 { - // If we didn't find any addresses that were not already completed - // then we'll just close the channel immediately before we return, - // and not track the waiter at all. - close(ch) - return ch - } - - // If we have at least one item to wait for then we'll remember this - // new tracker so we can reconsider it each time something has its - // completion reported. - t.waiters[waiter] = struct{}{} - return ch -} - -// WaitFor blocks until all of the addresses in the given set have had their -// completion reported using [CompletionTracker.ReportCompletion]. -// -// This is a convenience wrapper for [CompletionTracker.NewWaiterFor] that -// just blocks until the returned channel is closed. -func (t *CompletionTracker[T]) WaitFor(waitFor iter.Seq[T]) { - ch := t.NewWaiterFor(waitFor) - for range ch { - // just block until the channel is closed - } -} - -// ReportCompletion records the completion of the given item and signals -// any waiters for which it was the last remaining pending item. -func (t *CompletionTracker[T]) ReportCompletion(of T) { - t.mu.Lock() - t.completed[of] = struct{}{} - for waiter := range t.waiters { - delete(waiter.pending, of) - if len(waiter.pending) == 0 { - // nothing left for this waiter to wait for - close(waiter.ch) - delete(t.waiters, waiter) - } - } - t.mu.Unlock() -} - -// PendingItems returns a set of all of the items that are pending for at -// least one waiter at the time of the call. -// -// This is mainly to allow detection and cleanup of uncompleted work: once -// a caller thinks that all work ought to have completed it can call this -// function and should hopefully receive an empty set. If not, it can report -// an error about certain items being left unresolved and optionally make -// synthetic calls to [CompletionTracker.ReportCompletion] to cause all of the -// remaining waiters to be unblocked. -// -// The result is a fresh set allocated for each call, so the caller is free -// to modify that set without corrupting the internal state of the -// [CompletionTracker]. -func (t *CompletionTracker[T]) PendingItems() collections.Set[T] { - t.mu.Lock() - defer t.mu.Unlock() - - if len(t.waiters) == 0 { - return nil - } - ret := collections.NewSet[T]() - for waiter := range t.waiters { - maps.Copy(ret, waiter.pending) - } - return ret -} diff --git a/internal/engine/lifecycle/completion_tracker_test.go b/internal/engine/lifecycle/completion_tracker_test.go deleted file mode 100644 index 55b489747c..0000000000 --- a/internal/engine/lifecycle/completion_tracker_test.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright (c) The OpenTofu Authors -// SPDX-License-Identifier: MPL-2.0 -// Copyright (c) 2023 HashiCorp, Inc. -// SPDX-License-Identifier: MPL-2.0 - -package lifecycle - -import ( - "slices" - "sync/atomic" - "testing" - "time" -) - -func TestCompletionTracker(t *testing.T) { - ctx := t.Context() - - // This test would be a good candidate for the testing/synctest package, - // but at the time of writing it that package hasn't been stablized yet. - // - // Using testing/synctest would make it possible to use synctest.Wait() - // to be sure that the waiter goroutine has had a chance to write to - // the "complete" flag before we test it, and so it would not need to - // be an atomic.Bool anymore and our test that completing only a subset - // of the items doesn't unblock would be reliable rather than best-effort. - - // We'll use strings as the tracking keys here for simplicity's sake, - // but the intention is that real callers of this would define their - // own types to represent different kinds of trackable objects. - tracker := NewCompletionTracker[string]() - tracker.ReportCompletion("completed early") - - var complete atomic.Bool - waitCh := tracker.NewWaiterFor(slices.Values([]string{ - "completed early", - "completed second", - "completed last", - })) - waiterExitCh := make(chan struct{}) // closed once the goroutine below has finished waiting - go func() { - select { - case <-waitCh: - complete.Store(true) - t.Log("waiting channel was closed") - case <-ctx.Done(): - // We'll get here if the test finishes before waitCh is closed, - // suggesting that something went wrong. We'll just return to - // avoid leaking this goroutine, since the surrounding test has - // presumably already failed anyway. - } - close(waiterExitCh) - }() - - if complete.Load() { - t.Fatal("already complete before we resolved any other items") - } - t.Log("resolving the second item") - tracker.ReportCompletion("completed second") - // NOTE: The following is best effort as long as we aren't using - // test/synctest, because we can't be sure that the waiting goroutine - // has been scheduled long enough to reach the complete.Store(true). - time.Sleep(10 * time.Millisecond) - if complete.Load() { - t.Fatal("already complete before we resolved the final item") - } - t.Log("resolving the final item") - tracker.ReportCompletion("completed last") - <-waiterExitCh // wait for the waiter goroutine to exit - if !complete.Load() { - t.Fatal("not complete even though all items are resolved") - } - - // creating a waiter for items that have all already completed should - // return a channel that's already closed. - alreadyCompleteWaitCh := tracker.NewWaiterFor(slices.Values([]string{ - "completed early", - "completed last", - })) - select { - case <-alreadyCompleteWaitCh: - // the expected case - case <-time.After(1 * time.Second): - t.Fatal("already-completed waiter channel was not immediately closed") - case <-ctx.Done(): - // test has somehow already exited?! (this should not be possible) - } -} diff --git a/internal/engine/planning/completion_tracker.go b/internal/engine/planning/completion_tracker.go deleted file mode 100644 index 5c199f1600..0000000000 --- a/internal/engine/planning/completion_tracker.go +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright (c) The OpenTofu Authors -// SPDX-License-Identifier: MPL-2.0 -// Copyright (c) 2023 HashiCorp, Inc. -// SPDX-License-Identifier: MPL-2.0 - -package planning - -import ( - "github.com/opentofu/opentofu/internal/addrs" - "github.com/opentofu/opentofu/internal/engine/lifecycle" - "github.com/opentofu/opentofu/internal/states" -) - -// completionTracker is the concrete type of [lifecycle.CompletionTracker] we -// use to track completion during a planning operation. -// -// This uses implementations of [completionEvent] to represent the items that -// need to be completed. -type completionTracker = lifecycle.CompletionTracker[completionEvent] - -func (p *planContext) reportResourceInstancePlanCompletion(addr addrs.AbsResourceInstance) { - p.completion.ReportCompletion(resourceInstancePlanningComplete{addr.UniqueKey()}) -} - -func (p *planContext) reportResourceInstanceDeposedPlanCompletion(addr addrs.AbsResourceInstance, deposedKey states.DeposedKey) { - p.completion.ReportCompletion(resourceInstanceDeposedPlanningComplete{addr.UniqueKey(), deposedKey}) -} - -func (p *planContext) reportProviderInstanceClosed(addr addrs.AbsProviderInstanceCorrect) { - p.completion.ReportCompletion(providerInstanceClosed{addr.UniqueKey()}) -} - -// completionEvent is the type we use to represent events in -// a [completionTracker]. -// -// All implementations of this interface must be comparable. -type completionEvent interface { - completionEvent() -} - -// resourceInstancePlanningComplete represents that we've completed planning -// of a specific resource instance. -// -// Provider instances remain open until all of the resource instances that -// belong to them have completed planning, and ephemeral resource instances -// remain open until all of the other resource instances that depend on them -// have completed planning. -type resourceInstancePlanningComplete struct { - // key MUST be the unique key of an addrs.ResourceInstance. - key addrs.UniqueKey -} - -// completionEvent implements completionEvent. -func (r resourceInstancePlanningComplete) completionEvent() {} - -// resourceInstanceDeposedPlanningComplete is like -// [resourceInstancePlanningComplete] but for "deposed" objects, rather than -// for "current" objects. -type resourceInstanceDeposedPlanningComplete struct { - // key MUST be the unique key of an addrs.ResourceInstance. - instKey addrs.UniqueKey - - // deposedKey is the DeposedKey of the deposed object whose planning - // is being tracked. - deposedKey states.DeposedKey -} - -// completionEvent implements completionEvent. -func (r resourceInstanceDeposedPlanningComplete) completionEvent() {} - -// providerInstanceClosed represents that a previously-opened provider -// instance has now been closed. -// -// Ephemeral resource instances remain open until all of the provider instances -// that depend on them have been closed. -type providerInstanceClosed struct { - // key MUST be the unique key of an addrs.AbsProviderInstanceCorrect. - key addrs.UniqueKey -} - -// completionEvent implements completionEvent. -func (r providerInstanceClosed) completionEvent() {} diff --git a/internal/engine/planning/execgraph.go b/internal/engine/planning/execgraph.go index 9ec8eb5130..c567bb7a7e 100644 --- a/internal/engine/planning/execgraph.go +++ b/internal/engine/planning/execgraph.go @@ -45,6 +45,7 @@ type execGraphBuilder struct { resourceInstAddrRefs addrs.Map[addrs.AbsResourceInstance, execgraph.ResultRef[addrs.AbsResourceInstance]] providerInstAddrRefs addrs.Map[addrs.AbsProviderInstanceCorrect, execgraph.ResultRef[addrs.AbsProviderInstanceCorrect]] openProviderRefs addrs.Map[addrs.AbsProviderInstanceCorrect, execResultWithCloseBlockers[*exec.ProviderClient]] + openEphemeralRefs addrs.Map[addrs.AbsResourceInstance, registerExecCloseBlockerFunc] } // NOTE: There are additional methods for [execGraphBuilder] declared in @@ -57,6 +58,7 @@ func newExecGraphBuilder() *execGraphBuilder { resourceInstAddrRefs: addrs.MakeMap[addrs.AbsResourceInstance, execgraph.ResultRef[addrs.AbsResourceInstance]](), providerInstAddrRefs: addrs.MakeMap[addrs.AbsProviderInstanceCorrect, execgraph.ResultRef[addrs.AbsProviderInstanceCorrect]](), openProviderRefs: addrs.MakeMap[addrs.AbsProviderInstanceCorrect, execResultWithCloseBlockers[*exec.ProviderClient]](), + openEphemeralRefs: addrs.MakeMap[addrs.AbsResourceInstance, registerExecCloseBlockerFunc](), } } @@ -93,9 +95,7 @@ type registerExecCloseBlockerFunc func(execgraph.AnyResultRef) func (b *execGraphBuilder) makeCloseBlocker() (execgraph.AnyResultRef, registerExecCloseBlockerFunc) { waiter, lowerRegister := b.lower.MutableWaiter() registerFunc := registerExecCloseBlockerFunc(func(ref execgraph.AnyResultRef) { - b.mu.Lock() lowerRegister(ref) - b.mu.Unlock() }) return waiter, registerFunc } diff --git a/internal/engine/planning/execgraph_ephemeral.go b/internal/engine/planning/execgraph_ephemeral.go new file mode 100644 index 0000000000..d48c6f7f1a --- /dev/null +++ b/internal/engine/planning/execgraph_ephemeral.go @@ -0,0 +1,59 @@ +// Copyright (c) The OpenTofu Authors +// SPDX-License-Identifier: MPL-2.0 +// Copyright (c) 2023 HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package planning + +import ( + "context" + + "github.com/opentofu/opentofu/internal/engine/internal/execgraph" + "github.com/opentofu/opentofu/internal/lang/eval" + "github.com/zclconf/go-cty/cty" +) + +//////////////////////////////////////////////////////////////////////////////// +// This file contains methods of [execGraphBuilder] that are related to the +// parts of an execution graph that deal with resource instances of mode +// [addrs.EphemeralResourceMode] in particular. +//////////////////////////////////////////////////////////////////////////////// + +// EphemeralResourceSubgraph adds graph nodes needed to apply changes for a +// ephemeral resource instance, and returns what should be used as its final +// result to propagate into to downstream references. +// +// TODO: This is definitely not sufficient for the full complexity of all of the +// different ways ephemeral resources can potentially need to be handled in an +// execution graph. It's just a simple placeholder adapted from code that was +// originally written inline in [planGlue.planDesiredEphemeralResourceInstance] +// just to preserve the existing functionality for now until we design a more +// complete approach in later work. +func (b *execGraphBuilder) EphemeralResourceInstanceSubgraph(ctx context.Context, desired *eval.DesiredResourceInstance, plannedValue cty.Value, oracle *eval.PlanningOracle) execgraph.ResourceInstanceResultRef { + providerClientRef, closeProviderAfter := b.ProviderInstance(ctx, *desired.ProviderInstance, oracle) + + b.mu.Lock() + defer b.mu.Unlock() + + closeWait, registerCloseBlocker := b.makeCloseBlocker() + b.openEphemeralRefs.Put(desired.Addr, registerCloseBlocker) + + // We need to explicitly model our dependency on any upstream resource + // instances in the resource instance graph. These don't naturally emerge + // from the data flow because these results are intermediated through the + // evaluator, which indirectly incorporates the results into the + // desiredInstRef result we'll build below. + dependencyWaiter, closeDependencyAfter := b.waiterForResourceInstances(desired.RequiredResourceInstances.All()) + + instAddrRef := b.lower.ConstantResourceInstAddr(desired.Addr) + desiredInstRef := b.lower.ResourceInstanceDesired(instAddrRef, dependencyWaiter) + + openRef := b.lower.EphemeralOpen(desiredInstRef, providerClientRef) + stateRef := b.lower.EphemeralState(openRef) + closeRef := b.lower.EphemeralClose(openRef, closeWait) + + closeDependencyAfter(closeRef) + closeProviderAfter(closeRef) + + return stateRef +} diff --git a/internal/engine/planning/execgraph_managed.go b/internal/engine/planning/execgraph_managed.go index 3bc1b0d733..9bdf7476f9 100644 --- a/internal/engine/planning/execgraph_managed.go +++ b/internal/engine/planning/execgraph_managed.go @@ -6,6 +6,8 @@ package planning import ( + "context" + "github.com/zclconf/go-cty/cty" "github.com/opentofu/opentofu/internal/engine/internal/exec" @@ -29,15 +31,18 @@ import ( // originally written inline in [planGlue.planDesiredManagedResourceInstance] // just to preserve the existing functionality for now until we design a more // complete approach in later work. -func (b *execGraphBuilder) ManagedResourceInstanceSubgraph(desired *eval.DesiredResourceInstance, plannedValue cty.Value) execgraph.ResourceInstanceResultRef { +func (b *execGraphBuilder) ManagedResourceInstanceSubgraph(ctx context.Context, desired *eval.DesiredResourceInstance, plannedValue cty.Value, oracle *eval.PlanningOracle) execgraph.ResourceInstanceResultRef { + providerClientRef, closeProviderAfter := b.ProviderInstance(ctx, *desired.ProviderInstance, oracle) + + b.mu.Lock() + defer b.mu.Unlock() + // We need to explicitly model our dependency on any upstream resource // instances in the resource instance graph. These don't naturally emerge // from the data flow because these results are intermediated through the // evaluator, which indirectly incorporates the results into the // desiredInstRef result we'll build below. - dependencyWaiter := b.waiterForResourceInstances(desired.RequiredResourceInstances.All()) - - providerClientRef, closeProviderAfter := b.ProviderInstance(*desired.ProviderInstance, b.lower.Waiter()) + dependencyWaiter, closeDependencyAfter := b.waiterForResourceInstances(desired.RequiredResourceInstances.All()) // FIXME: If this is one of the "replace" actions then we need to generate // a more complex graph that has two pairs of "final plan" and "apply". @@ -56,7 +61,9 @@ func (b *execGraphBuilder) ManagedResourceInstanceSubgraph(desired *eval.Desired execgraph.NilResultRef[*exec.ResourceInstanceObject](), providerClientRef, ) + closeProviderAfter(finalResultRef) + closeDependencyAfter(finalResultRef) return finalResultRef } diff --git a/internal/engine/planning/execgraph_provider.go b/internal/engine/planning/execgraph_provider.go index abc3806ef5..4635a91e5a 100644 --- a/internal/engine/planning/execgraph_provider.go +++ b/internal/engine/planning/execgraph_provider.go @@ -6,9 +6,12 @@ package planning import ( + "context" + "github.com/opentofu/opentofu/internal/addrs" "github.com/opentofu/opentofu/internal/engine/internal/exec" "github.com/opentofu/opentofu/internal/engine/internal/execgraph" + "github.com/opentofu/opentofu/internal/lang/eval" ) //////////////////////////////////////////////////////////////////////////////// @@ -29,10 +32,17 @@ import ( // the graph at once, although each distinct provider instance address gets // only one set of nodes added and then subsequent calls get references to // the same operation results. -func (b *execGraphBuilder) ProviderInstance(addr addrs.AbsProviderInstanceCorrect, waitFor execgraph.AnyResultRef) (execgraph.ResultRef[*exec.ProviderClient], registerExecCloseBlockerFunc) { +func (b *execGraphBuilder) ProviderInstance(ctx context.Context, addr addrs.AbsProviderInstanceCorrect, oracle *eval.PlanningOracle) (execgraph.ResultRef[*exec.ProviderClient], registerExecCloseBlockerFunc) { b.mu.Lock() defer b.mu.Unlock() + resourceDependencies := addrs.MakeSet[addrs.AbsResourceInstance]() + rawDependencies := oracle.ProviderInstanceResourceDependencies(ctx, addr) + for dep := range rawDependencies { + resourceDependencies.Add(dep.Addr) + } + dependencyWaiter, closeDependencyAfter := b.waiterForResourceInstances(resourceDependencies.All()) + // FIXME: This is an adaptation of an earlier attempt at this where this // helper was in the underlying execgraph.Builder type, built to work // without any help from the planning engine. But this design isn't @@ -58,14 +68,13 @@ func (b *execGraphBuilder) ProviderInstance(addr addrs.AbsProviderInstanceCorrec if existing, ok := b.openProviderRefs.GetOk(addr); ok { return existing.Result, existing.CloseBlockerFunc } - configResult := b.lower.ProviderInstanceConfig(addrResult, waitFor) + configResult := b.lower.ProviderInstanceConfig(addrResult, dependencyWaiter) openResult := b.lower.ProviderInstanceOpen(configResult) closeWait, registerCloseBlocker := b.makeCloseBlocker() - // Nothing actually depends on the result of the "close" operation, but - // eventual execution of the graph will still wait for it to complete - // because _all_ operations must complete before execution is considered - // to be finished. - _ = b.lower.ProviderInstanceClose(openResult, closeWait) + + closeRef := b.lower.ProviderInstanceClose(openResult, closeWait) + closeDependencyAfter(closeRef) + b.openProviderRefs.Put(addr, execResultWithCloseBlockers[*exec.ProviderClient]{ Result: openResult, CloseBlockerResult: closeWait, diff --git a/internal/engine/planning/execgraph_resource.go b/internal/engine/planning/execgraph_resource.go index 44828bcf12..b78f8a76c2 100644 --- a/internal/engine/planning/execgraph_resource.go +++ b/internal/engine/planning/execgraph_resource.go @@ -45,8 +45,6 @@ func (b *execGraphBuilder) SetResourceInstanceFinalStateResult(addr addrs.AbsRes // system that caused the construction of subgraphs for different resource // instances to happen in the wrong order. func (b *execGraphBuilder) resourceInstanceFinalStateResult(addr addrs.AbsResourceInstance) execgraph.AnyResultRef { - b.mu.Lock() - defer b.mu.Unlock() return b.lower.ResourceInstanceFinalStateResult(addr) } @@ -60,11 +58,21 @@ func (b *execGraphBuilder) resourceInstanceFinalStateResult(addr addrs.AbsResour // The value of the returned result is not actually meaningful; it's used only // for its blocking behavior to add additional ordering constraints to an // execution graph. -func (b *execGraphBuilder) waiterForResourceInstances(instAddrs iter.Seq[addrs.AbsResourceInstance]) execgraph.AnyResultRef { +// +// The function returned allows callers to ensure any dependency resources +// that stay "open" will not be closed until the given references has completed. +func (b *execGraphBuilder) waiterForResourceInstances(instAddrs iter.Seq[addrs.AbsResourceInstance]) (execgraph.AnyResultRef, registerExecCloseBlockerFunc) { var dependencyResults []execgraph.AnyResultRef for instAddr := range instAddrs { depInstResult := b.resourceInstanceFinalStateResult(instAddr) dependencyResults = append(dependencyResults, depInstResult) } - return b.lower.Waiter(dependencyResults...) + + return b.lower.Waiter(dependencyResults...), func(ref execgraph.AnyResultRef) { + for instAddr := range instAddrs { + if instAddr.Resource.Resource.Mode == addrs.EphemeralResourceMode { + b.openEphemeralRefs.Get(instAddr)(ref) + } + } + } } diff --git a/internal/engine/planning/plan.go b/internal/engine/planning/plan.go index b585a33352..92c97d44dc 100644 --- a/internal/engine/planning/plan.go +++ b/internal/engine/planning/plan.go @@ -74,9 +74,9 @@ func PlanChanges(ctx context.Context, prevRoundState *states.State, configInst * // here but we'll still produce a best-effort [plans.Plan] describing // the situation because that often gives useful information for debugging // what caused the errors. - plan := planCtx.Close() + plan, moreDiags := planCtx.Close(ctx) plan.Errored = true - return plan, diags + return plan, diags.Append(moreDiags) } if evalResult == nil { // This should not happen: we should always have an evalResult if @@ -125,5 +125,10 @@ func PlanChanges(ctx context.Context, prevRoundState *states.State, configInst * } } - return planCtx.Close(), diags + plan, moreDiags := planCtx.Close(ctx) + diags = diags.Append(moreDiags) + if diags.HasErrors() { + plan.Errored = true + } + return plan, diags } diff --git a/internal/engine/planning/plan_context.go b/internal/engine/planning/plan_context.go index 2c353aba10..c4493273ff 100644 --- a/internal/engine/planning/plan_context.go +++ b/internal/engine/planning/plan_context.go @@ -6,15 +6,18 @@ package planning import ( + "context" "log" + "slices" + "sync" "github.com/opentofu/opentofu/internal/addrs" - "github.com/opentofu/opentofu/internal/engine/lifecycle" "github.com/opentofu/opentofu/internal/engine/plugins" "github.com/opentofu/opentofu/internal/lang/eval" "github.com/opentofu/opentofu/internal/logging" "github.com/opentofu/opentofu/internal/plans" "github.com/opentofu/opentofu/internal/states" + "github.com/opentofu/opentofu/internal/tfdiags" ) // planContext is our shared state for the various parts of a single call @@ -45,15 +48,18 @@ type planContext struct { // of prevRoundState. refreshedState *states.SyncState - completion *completionTracker + providers plugins.Providers providerInstances *providerInstances - providers plugins.Providers - - // TODO: something to track which ephemeral resource instances are currently - // open? (Do we actually need that, or can we just rely on a background - // goroutine to babysit those based on the completion tracker?) + // Stack of ephemeral and provider close functions + // Given the current state of the planning engine, we wait until + // the end of the run to close all of the "opened" items. We + // also need to close them in a specific order to prevent dependency + // conflicts. We posit that for plan, closing in the reverse order of opens + // will ensure that this order is correctly preserved. + closeStackMu sync.Mutex + closeStack []func(context.Context) tfdiags.Diagnostics } func newPlanContext(evalCtx *eval.EvalContext, prevRoundState *states.State, providers plugins.Providers) *planContext { @@ -63,8 +69,6 @@ func newPlanContext(evalCtx *eval.EvalContext, prevRoundState *states.State, pro changes := plans.NewChanges() refreshedState := prevRoundState.DeepCopy() - completion := lifecycle.NewCompletionTracker[completionEvent]() - execGraphBuilder := newExecGraphBuilder() return &planContext{ @@ -73,8 +77,7 @@ func newPlanContext(evalCtx *eval.EvalContext, prevRoundState *states.State, pro execGraphBuilder: execGraphBuilder, prevRoundState: prevRoundState, refreshedState: refreshedState.SyncWrapper(), - completion: completion, - providerInstances: newProviderInstances(completion), + providerInstances: newProviderInstances(), providers: providers, } } @@ -84,16 +87,8 @@ func newPlanContext(evalCtx *eval.EvalContext, prevRoundState *states.State, pro // // After calling this function the [planContext] object is invalid and must // not be used anymore. -func (p *planContext) Close() *plans.Plan { - // Before we return we'll make sure our completion tracker isn't waiting - // for anything else to complete, so that we can unblock closing of - // any provider instances or ephemeral resource instances that might've - // got left behind by panics/etc. We should not be relying on this in the - // happy path. - for event := range p.completion.PendingItems() { - log.Printf("[TRACE] planContext: synthetic completion of %#v", event) - p.completion.ReportCompletion(event) - } +func (p *planContext) Close(ctx context.Context) (*plans.Plan, tfdiags.Diagnostics) { + var diags tfdiags.Diagnostics // We'll freeze the execution graph into a serialized form here, so that // we can recover an equivalent execution graph again during the apply @@ -102,6 +97,15 @@ func (p *planContext) Close() *plans.Plan { if logging.IsDebugOrHigher() { log.Println("[DEBUG] Planned execution graph:\n" + logging.Indent(execGraph.DebugRepr())) } + + p.closeStackMu.Lock() + defer p.closeStackMu.Unlock() + + slices.Reverse(p.closeStack) + for _, closer := range p.closeStack { + diags = diags.Append(closer(ctx)) + } + execGraphOpaque := execGraph.Marshal() return &plans.Plan{ @@ -119,5 +123,5 @@ func (p *planContext) Close() *plans.Plan { // graph so we can round-trip it through saved plan files while // the CLI layer is still working in terms of [plans.Plan]. ExecutionGraph: execGraphOpaque, - } + }, diags } diff --git a/internal/engine/planning/plan_data.go b/internal/engine/planning/plan_data.go index efe9b7b428..849140c03f 100644 --- a/internal/engine/planning/plan_data.go +++ b/internal/engine/planning/plan_data.go @@ -20,8 +20,6 @@ import ( ) func (p *planGlue) planDesiredDataResourceInstance(ctx context.Context, inst *eval.DesiredResourceInstance, egb *execGraphBuilder) (cty.Value, execgraph.ResourceInstanceResultRef, tfdiags.Diagnostics) { - // Regardless of outcome we'll always report that we completed planning. - defer p.planCtx.reportResourceInstancePlanCompletion(inst.Addr) var diags tfdiags.Diagnostics validateDiags := p.planCtx.providers.ValidateResourceConfig(ctx, inst.Provider, inst.ResourceMode, inst.ResourceType, inst.ConfigVal) @@ -91,8 +89,6 @@ func (p *planGlue) planDesiredDataResourceInstance(ctx context.Context, inst *ev } func (p *planGlue) planOrphanDataResourceInstance(_ context.Context, addr addrs.AbsResourceInstance, state *states.ResourceInstanceObjectFullSrc, egb *execGraphBuilder) tfdiags.Diagnostics { - // Regardless of outcome we'll always report that we completed planning. - defer p.planCtx.reportResourceInstancePlanCompletion(addr) var diags tfdiags.Diagnostics // An orphan data object is always just discarded completely, because diff --git a/internal/engine/planning/plan_ephemeral.go b/internal/engine/planning/plan_ephemeral.go index 0c3328dacc..005c7b14ae 100644 --- a/internal/engine/planning/plan_ephemeral.go +++ b/internal/engine/planning/plan_ephemeral.go @@ -7,24 +7,64 @@ package planning import ( "context" + "fmt" "github.com/opentofu/opentofu/internal/engine/internal/execgraph" "github.com/opentofu/opentofu/internal/lang/eval" + "github.com/opentofu/opentofu/internal/shared" "github.com/opentofu/opentofu/internal/tfdiags" "github.com/zclconf/go-cty/cty" ) func (p *planGlue) planDesiredEphemeralResourceInstance(ctx context.Context, inst *eval.DesiredResourceInstance, egb *execGraphBuilder) (cty.Value, execgraph.ResourceInstanceResultRef, tfdiags.Diagnostics) { - // Regardless of outcome we'll always report that we completed planning. - defer p.planCtx.reportResourceInstancePlanCompletion(inst.Addr) var diags tfdiags.Diagnostics - validateDiags := p.planCtx.providers.ValidateResourceConfig(ctx, inst.Provider, inst.Addr.Resource.Resource.Mode, inst.Addr.Resource.Resource.Type, inst.ConfigVal) - diags = diags.Append(validateDiags) - if diags.HasErrors() { - return cty.DynamicVal, nil, diags + schema, _ := p.planCtx.providers.ResourceTypeSchema(ctx, inst.Provider, inst.Addr.Resource.Resource.Mode, inst.Addr.Resource.Resource.Type) + if schema == nil || schema.Block == nil { + // Should be caught during validation, so we don't bother with a pretty error here + diags = diags.Append(fmt.Errorf("provider %q does not support ephemeral resource %q", inst.ProviderInstance, inst.Addr.Resource.Resource.Type)) + return cty.NilVal, nil, diags } - // TODO: Implement - panic("unimplemented") + if inst.ProviderInstance == nil { + // If we don't even know which provider instance we're supposed to be + // talking to then we'll just return a placeholder value, because + // we don't have any way to generate a speculative plan. + return cty.NilVal, nil, diags + } + + providerClient, moreDiags := p.providerClient(ctx, *inst.ProviderInstance) + if providerClient == nil { + moreDiags = moreDiags.Append(tfdiags.AttributeValue( + tfdiags.Error, + "Provider instance not available", + fmt.Sprintf("Cannot plan %s because its associated provider instance %s cannot initialize.", inst.Addr, *inst.ProviderInstance), + nil, + )) + } + diags = diags.Append(moreDiags) + if moreDiags.HasErrors() { + return cty.NilVal, nil, diags + } + + newVal, closeFunc, openDiags := shared.OpenEphemeralResourceInstance( + ctx, + inst.Addr, + schema.Block, + *inst.ProviderInstance, + providerClient, + inst.ConfigVal, + shared.EphemeralResourceHooks{}, + ) + diags = diags.Append(openDiags) + if openDiags.HasErrors() { + return cty.NilVal, nil, diags + } + + p.planCtx.closeStackMu.Lock() + p.planCtx.closeStack = append(p.planCtx.closeStack, closeFunc) + p.planCtx.closeStackMu.Unlock() + + resRef := egb.EphemeralResourceInstanceSubgraph(ctx, inst, newVal, p.oracle) + return newVal, resRef, diags } diff --git a/internal/engine/planning/plan_eval_glue.go b/internal/engine/planning/plan_eval_glue.go index 8cdf83ef52..47f39a170b 100644 --- a/internal/engine/planning/plan_eval_glue.go +++ b/internal/engine/planning/plan_eval_glue.go @@ -297,49 +297,6 @@ func (p *planGlue) providerClient(ctx context.Context, addr addrs.AbsProviderIns return p.planCtx.providerInstances.ProviderClient(ctx, addr, p) } -// providerInstanceCompletionEvents returns all of the [completionEvent] values -// that need to have been reported to the completion tracker before an -// instance of the given provider can be closed. -func (p *planGlue) providerInstanceCompletionEvents(ctx context.Context, addr addrs.AbsProviderInstanceCorrect) iter.Seq[completionEvent] { - return func(yield func(completionEvent) bool) { - configUsers := p.oracle.ProviderInstanceUsers(ctx, addr) - for _, resourceInstAddr := range configUsers.ResourceInstances { - event := resourceInstancePlanningComplete{resourceInstAddr.UniqueKey()} - if !yield(event) { - return - } - } - // We also need to wait for the completion of anything we can find - // in the state, just in case any resource instances are "orphaned" - // and in case there are any deposed objects we need to deal with. - for _, modState := range p.planCtx.prevRoundState.Modules { - for _, resourceState := range modState.Resources { - for instKey, instanceState := range resourceState.Instances { - resourceInstAddr := resourceState.Addr.Instance(instKey) - // FIXME: State is still using the not-quite-right address - // types for provider instances, so we'll shim here. - providerInstAddr := resourceState.ProviderConfig.InstanceCorrect(instanceState.ProviderKey) - if !addr.Equal(providerInstAddr) { - continue // not for this provider instance - } - if instanceState.Current != nil { - event := resourceInstancePlanningComplete{resourceInstAddr.UniqueKey()} - if !yield(event) { - return - } - } - for dk := range instanceState.Deposed { - event := resourceInstanceDeposedPlanningComplete{resourceInstAddr.UniqueKey(), dk} - if !yield(event) { - return - } - } - } - } - } - } -} - func (p *planGlue) desiredResourceInstanceMustBeDeferred(inst *eval.DesiredResourceInstance) bool { // There are various reasons why we might need to defer final planning // of this to a later round. The following is not exhaustive but is a diff --git a/internal/engine/planning/plan_managed.go b/internal/engine/planning/plan_managed.go index a48376bbae..4dc0ce89d8 100644 --- a/internal/engine/planning/plan_managed.go +++ b/internal/engine/planning/plan_managed.go @@ -23,9 +23,6 @@ import ( ) func (p *planGlue) planDesiredManagedResourceInstance(ctx context.Context, inst *eval.DesiredResourceInstance, egb *execGraphBuilder) (plannedVal cty.Value, applyResultRef execgraph.ResourceInstanceResultRef, diags tfdiags.Diagnostics) { - // Regardless of outcome we'll always report that we completed planning. - defer p.planCtx.reportResourceInstancePlanCompletion(inst.Addr) - // There are various reasons why we might need to defer final planning // of this to a later round. The following is not exhaustive but is a // placeholder to show where deferral might fit in. @@ -262,24 +259,18 @@ func (p *planGlue) planDesiredManagedResourceInstance(ctx context.Context, inst // and reasonable. In particular, these subgraph-building methods should // be easily unit-testable due to not depending on anything other than // their input. - finalResultRef := egb.ManagedResourceInstanceSubgraph(inst, planResp.PlannedState) + finalResultRef := egb.ManagedResourceInstanceSubgraph(ctx, inst, planResp.PlannedState, p.oracle) // Our result value for ongoing downstream planning is the planned new state. return planResp.PlannedState, finalResultRef, diags } func (p *planGlue) planOrphanManagedResourceInstance(ctx context.Context, addr addrs.AbsResourceInstance, state *states.ResourceInstanceObjectFullSrc, egb *execGraphBuilder) tfdiags.Diagnostics { - // Regardless of outcome we'll always report that we completed planning. - defer p.planCtx.reportResourceInstancePlanCompletion(addr) - // TODO: Implement panic("unimplemented") } func (p *planGlue) planDeposedManagedResourceInstanceObject(ctx context.Context, addr addrs.AbsResourceInstance, deposedKey states.DeposedKey, state *states.ResourceInstanceObjectFullSrc, egb *execGraphBuilder) tfdiags.Diagnostics { - // Regardless of outcome we'll always report that we completed planning. - defer p.planCtx.reportResourceInstanceDeposedPlanCompletion(addr, deposedKey) - // TODO: Implement panic("unimplemented") } diff --git a/internal/engine/planning/providers.go b/internal/engine/planning/providers.go index 69181642bb..96c488ed85 100644 --- a/internal/engine/planning/providers.go +++ b/internal/engine/planning/providers.go @@ -29,17 +29,11 @@ type providerInstances struct { // before waiting on an object retrieved from it. active addrs.Map[addrs.AbsProviderInstanceCorrect, *grapheval.Once[providers.Configured]] activeMu sync.Mutex - - // completion is a [completionTracker] that's shared with the [planCtx] - // we belong to so that we can detect when all of the work of each particular - // provider instance has completed. - completion *completionTracker } -func newProviderInstances(completion *completionTracker) *providerInstances { +func newProviderInstances() *providerInstances { return &providerInstances{ - active: addrs.MakeMap[addrs.AbsProviderInstanceCorrect, *grapheval.Once[providers.Configured]](), - completion: completion, + active: addrs.MakeMap[addrs.AbsProviderInstanceCorrect, *grapheval.Once[providers.Configured]](), } } @@ -69,6 +63,7 @@ func (pi *providerInstances) ProviderClient(ctx context.Context, addr addrs.AbsP pi.activeMu.Unlock() oracle := planGlue.oracle + planCtx := planGlue.planCtx once := pi.active.Get(addr) return once.Do(ctx, func(ctx context.Context) (providers.Configured, tfdiags.Diagnostics) { configVal := oracle.ProviderInstanceConfig(ctx, addr) @@ -81,6 +76,7 @@ func (pi *providerInstances) ProviderClient(ctx context.Context, addr addrs.AbsP // be performed. return nil, nil } + // If _this_ call fails then unfortunately we'll end up duplicating // its diagnostics for every resource instance that depends on this // provider instance, which is not ideal but we don't currently have @@ -92,33 +88,29 @@ func (pi *providerInstances) ProviderClient(ctx context.Context, addr addrs.AbsP // then this should return "nil, nil" in the error case so that the // caller will treat it the same as a "configuration not valid enough" // problem. - ret, diags := planGlue.planCtx.providers.NewConfiguredProvider(ctx, addr.Config.Config.Provider, configVal) + ret, diags := planCtx.providers.NewConfiguredProvider(ctx, addr.Config.Config.Provider, configVal) + + closeCh := make(chan struct{}) + + planCtx.closeStackMu.Lock() + planCtx.closeStack = append(planCtx.closeStack, func(ctx context.Context) tfdiags.Diagnostics { + println("CLOSING PROVIDER " + addr.String()) + closeCh <- struct{}{} + return tfdiags.Diagnostics{}.Append(ret.Close(ctx)) + }) + planCtx.closeStackMu.Unlock() // This background goroutine deals with closing the provider once it's // no longer needed, and with asking it to gracefully stop if our // given context is cancelled. - waitCh := pi.completion.NewWaiterFor(planGlue.providerInstanceCompletionEvents(ctx, addr)) go func() { - // Once this goroutine is complete the provider instance should be - // treated as closed. - defer planGlue.planCtx.reportProviderInstanceClosed(addr) - cancelCtx := ctx withoutCancelCtx := context.WithoutCancel(ctx) for { select { - case <-waitCh: - // Everything that we were expecting to use the provider - // instance has now completed, so we can close it. - // - // (An error from this goes nowhere. If we want to track - // this then maybe we _should_ switch to having a central - // diags repository inside the providerInstances object, - // as discussed above for failing NewConfiguredProvider, - // and then we could write this failure into there.) - if ret != nil { - _ = ret.Close(withoutCancelCtx) - } + case <-closeCh: + // Close() has been called from within the closers + // No further actions are nessesary return case <-cancelCtx.Done(): // If the context we were given is cancelled then we'll @@ -128,10 +120,7 @@ func (pi *providerInstances) ProviderClient(ctx context.Context, addr addrs.AbsP if ret != nil { _ = ret.Stop(withoutCancelCtx) } - // We'll now replace cancelCtx with the one guaranteed - // to never be cancelled so that we'll block until waitCh - // is closed. - cancelCtx = withoutCancelCtx + return } } }() diff --git a/internal/engine/plugins/plugins.go b/internal/engine/plugins/plugins.go index beb231e50f..daf8693fab 100644 --- a/internal/engine/plugins/plugins.go +++ b/internal/engine/plugins/plugins.go @@ -110,8 +110,9 @@ func (n *newRuntimePlugins) NewConfiguredProvider(ctx context.Context, provider return nil, diags } + unmarkedConfigVal, _ := configVal.UnmarkDeep() resp := inst.ConfigureProvider(ctx, providers.ConfigureProviderRequest{ - Config: configVal, + Config: unmarkedConfigVal, // We aren't actually Terraform, so we'll just pretend to be a // Terraform version that has roughly the same functionality that @@ -197,8 +198,9 @@ func (n *newRuntimePlugins) ValidateProviderConfig(ctx context.Context, provider return diags } + unmarkedConfigVal, _ := configVal.UnmarkDeep() resp := inst.ValidateProviderConfig(ctx, providers.ValidateProviderConfigRequest{ - Config: configVal, + Config: unmarkedConfigVal, }) diags = diags.Append(resp.Diagnostics) return diags diff --git a/internal/lang/eval/config_plan.go b/internal/lang/eval/config_plan.go index c2e413f384..dc6bf06e15 100644 --- a/internal/lang/eval/config_plan.go +++ b/internal/lang/eval/config_plan.go @@ -115,15 +115,12 @@ type PlanGlue interface { // actions to destroy any instances that are currently tracked but no longer // configured. func (c *ConfigInstance) DrivePlanning(ctx context.Context, buildGlue func(*PlanningOracle) PlanGlue) (*PlanningResult, tfdiags.Diagnostics) { + var diags tfdiags.Diagnostics + // All of our work will be associated with a workgraph worker that serves // as the initial worker node in the work graph. ctx = grapheval.ContextWithNewWorker(ctx) - relationships, diags := c.prepareToPlan(ctx) - if diags.HasErrors() { - return nil, diags - } - // We have a little chicken vs. egg problem here where we can't fully // initialize the oracle until we've built the root module instance, // so we initially pass an intentionally-invalid oracle to the build @@ -142,7 +139,6 @@ func (c *ConfigInstance) DrivePlanning(ctx context.Context, buildGlue func(*Plan } // We can now initialize the planning oracle, before we start evaluating // anything that might cause calls to the evalGlue object. - oracle.relationships = relationships oracle.rootModuleInstance = rootModuleInstance oracle.evalContext = c.evalContext diff --git a/internal/lang/eval/config_plan_oracle.go b/internal/lang/eval/config_plan_oracle.go index 2e51daced0..77424baa5d 100644 --- a/internal/lang/eval/config_plan_oracle.go +++ b/internal/lang/eval/config_plan_oracle.go @@ -7,11 +7,12 @@ package eval import ( "context" - "fmt" + "iter" "github.com/zclconf/go-cty/cty" "github.com/opentofu/opentofu/internal/addrs" + "github.com/opentofu/opentofu/internal/lang/eval/internal/configgraph" "github.com/opentofu/opentofu/internal/lang/eval/internal/evalglue" "github.com/opentofu/opentofu/internal/lang/grapheval" ) @@ -19,8 +20,6 @@ import ( // A PlanningOracle provides information from the configuration that is needed // by the planning engine to help orchestrate the planning process. type PlanningOracle struct { - relationships *ResourceRelationships - // NOTE: Any method of PlanningOracle that interacts with methods of // this or anything accessible through it MUST use // [grapheval.ContextWithNewWorker] to make sure it's using a @@ -63,50 +62,14 @@ func (o *PlanningOracle) ProviderInstanceConfig(ctx context.Context, addr addrs. return ret } -// ProviderInstanceUsers returns an object representing which resource instances -// are associated with the provider instance that has the given address. -// -// The planning phase must keep the provider open at least long enough for -// all of the reported resource instances to be planned. -// -// Note that the planning engine will need to plan destruction of any resource -// instances that aren't in the desired state once -// [ConfigInstance.DrivePlanning] returns, and provider instances involved in -// those followup steps will need to remain open until that other work is -// done. This package is not concerned with those details; that's the planning -// engine's responsibility. -func (o *PlanningOracle) ProviderInstanceUsers(ctx context.Context, addr addrs.AbsProviderInstanceCorrect) ProviderInstanceUsers { +func (o *PlanningOracle) ProviderInstanceResourceDependencies(ctx context.Context, addr addrs.AbsProviderInstanceCorrect) iter.Seq[*configgraph.ResourceInstance] { ctx = grapheval.ContextWithNewWorker(ctx) - _ = ctx // not using this right now, but keeping this to remind future maintainers that we'd need this - return o.relationships.ProviderInstanceUsers.Get(addr) -} - -// EphemeralResourceInstanceUsers returns an object describing which other -// resource instances and providers rely on the result value of the ephemeral -// resource with the given address. -// -// The planning phase must keep the ephemeral resource instance open at least -// long enough for all of the reported resource instances to be planned and -// for all of the reported provider instances to be closed. -// -// The given address must be for an ephemeral resource instance or this function -// will panic. -// -// Note that the planning engine will need to plan destruction of any resource -// instances that aren't in the desired state once -// [ConfigInstance.DrivePlanning] returns, and provider instances involved in -// those followup steps might be included in a result from this method, in -// which case the planning phase must hold the provider instance open long -// enough to complete those followup steps. -func (o *PlanningOracle) EphemeralResourceInstanceUsers(ctx context.Context, addr addrs.AbsResourceInstance) EphemeralResourceInstanceUsers { - ctx = grapheval.ContextWithNewWorker(ctx) - _ = ctx // not using this right now, but keeping this to remind future maintainers that we'd need this - - if addr.Resource.Resource.Mode != addrs.EphemeralResourceMode { - panic(fmt.Sprintf("EphemeralResourceInstanceUsers with non-ephemeral %s", addr)) + providerInst := evalglue.ProviderInstance(ctx, o.rootModuleInstance, addr) + if providerInst == nil { + return nil } - return o.relationships.EphemeralResourceUsers.Get(addr) + return providerInst.ResourceInstanceDependencies(ctx) } func (o *PlanningOracle) EvalContext(ctx context.Context) *EvalContext { diff --git a/internal/lang/eval/config_prepare.go b/internal/lang/eval/config_prepare.go index af502c02dd..b90725dba1 100644 --- a/internal/lang/eval/config_prepare.go +++ b/internal/lang/eval/config_prepare.go @@ -13,123 +13,10 @@ import ( "github.com/opentofu/opentofu/internal/addrs" "github.com/opentofu/opentofu/internal/lang/eval/internal/configgraph" "github.com/opentofu/opentofu/internal/lang/eval/internal/evalglue" - "github.com/opentofu/opentofu/internal/lang/grapheval" "github.com/opentofu/opentofu/internal/plans/objchange" "github.com/opentofu/opentofu/internal/tfdiags" ) -// prepareToPlan implements an extra preparation phase we perform before -// running the main plan phase so we can deal with the "chicken or the egg" -// problem of needing to evaluate configuraton to learn the relationships -// between resources instances and provider instances but needing to already -// know those relationships in order to fully evaluate the configuration. -// -// As a compromise then, we initially perform a more conservative walk that -// just treats all resource instances as having fully-unknown values so that -// we don't need to configure any providers or open any ephemeral resource -// instances, and then ask the resulting configgraph objects to report -// the resource/provider dependencies which we can then use on a subsequent -// re-eval that includes the real provider planning actions. -// -// This approach relies on the idea that an evaluation with more unknown -// values should always produce a superset of the true dependencies that -// would be reported once there are fewer unknown values, and so the -// result of this function should capture _at least_ the dependencies -// required to successfully plan, possibly also including some harmless -// additional dependency relationships that aren't strictly needed once we can -// evaluate using real resource planning results. The planning phase will -// then be able to produce its own tighter version of this information to -// use when building the execution graph for the apply phase. -func (c *ConfigInstance) prepareToPlan(ctx context.Context) (*ResourceRelationships, tfdiags.Diagnostics) { - // All of our work will be associated with a workgraph worker that serves - // as the initial worker node in the work graph. - ctx = grapheval.ContextWithNewWorker(ctx) - - rootModuleInstance, diags := c.precheckedModuleInstance(ctx) - if diags.HasErrors() { - return nil, diags - } - ret := &ResourceRelationships{ - EphemeralResourceUsers: addrs.MakeMap[addrs.AbsResourceInstance, EphemeralResourceInstanceUsers](), - ProviderInstanceUsers: addrs.MakeMap[addrs.AbsProviderInstanceCorrect, ProviderInstanceUsers](), - } - for depender := range evalglue.ResourceInstancesDeep(ctx, rootModuleInstance) { - dependerAddr := depender.Addr - for dependee := range depender.ResourceInstanceDependencies(ctx) { - dependeeAddr := dependee.Addr - if dependeeAddr.Resource.Resource.Mode == addrs.EphemeralResourceMode { - if !ret.EphemeralResourceUsers.Has(dependeeAddr) { - ret.EphemeralResourceUsers.Put(dependeeAddr, EphemeralResourceInstanceUsers{ - ResourceInstances: addrs.MakeSet[addrs.AbsResourceInstance](), - ProviderInstances: addrs.MakeSet[addrs.AbsProviderInstanceCorrect](), - }) - } - set := ret.EphemeralResourceUsers.Get(dependeeAddr).ResourceInstances - set.Add(dependerAddr) - } - } - providerInst, _, _ := depender.ProviderInstance(ctx) - if providerInst, known := configgraph.GetKnown(providerInst); known { - providerInstAddr := providerInst.Addr - if !ret.ProviderInstanceUsers.Has(providerInstAddr) { - ret.ProviderInstanceUsers.Put(providerInstAddr, ProviderInstanceUsers{ - ResourceInstances: addrs.MakeSet[addrs.AbsResourceInstance](), - }) - } - set := ret.ProviderInstanceUsers.Get(providerInstAddr).ResourceInstances - set.Add(dependerAddr) - } - } - for depender := range evalglue.ProviderInstancesDeep(ctx, rootModuleInstance) { - dependerAddr := depender.Addr - for dependee := range depender.ResourceInstanceDependencies(ctx) { - dependeeAddr := dependee.Addr - if dependeeAddr.Resource.Resource.Mode == addrs.EphemeralResourceMode { - if !ret.EphemeralResourceUsers.Has(dependeeAddr) { - ret.EphemeralResourceUsers.Put(dependeeAddr, EphemeralResourceInstanceUsers{ - ResourceInstances: addrs.MakeSet[addrs.AbsResourceInstance](), - ProviderInstances: addrs.MakeSet[addrs.AbsProviderInstanceCorrect](), - }) - } - set := ret.EphemeralResourceUsers.Get(dependeeAddr).ProviderInstances - set.Add(dependerAddr) - } - } - } - return ret, diags -} - -type ResourceRelationships struct { - // EphemeralResourceUsers is a map from ephemeral resource instance - // addresses to the sets of addresses of other resource instances (of - // any mode, including other ephemeral ones) which depend on them. - // - // A subsequent plan phase can use this to detect when all of the - // downstream users of an ephemeral resource instance have finished - // their work and so it's okay to close the ephemeral resource instance. - // - // TODO: This should also capture the provider instances that are depending - // on each ephemeral resource instance. - EphemeralResourceUsers addrs.Map[addrs.AbsResourceInstance, EphemeralResourceInstanceUsers] - - // EphemeralResourceUsers is a map from provider instance addresses to the - // sets of addresses of resource instances which a provided by them. - // - // A subsequent plan phase can use this to detect when all of the - // downstream users of a provider instance have finished their work and so - // it's okay to close the provider instance. - ProviderInstanceUsers addrs.Map[addrs.AbsProviderInstanceCorrect, ProviderInstanceUsers] -} - -type EphemeralResourceInstanceUsers struct { - ResourceInstances addrs.Set[addrs.AbsResourceInstance] - ProviderInstances addrs.Set[addrs.AbsProviderInstanceCorrect] -} - -type ProviderInstanceUsers struct { - ResourceInstances addrs.Set[addrs.AbsResourceInstance] -} - // precheckedModuleInstance deals with the common part of both // [ConfigInstance.prepareToPlan] and [ConfigInstance.Validate], where we // evaluate the configuration using unknown value placeholders for resource diff --git a/internal/lang/eval/config_prepare_test.go b/internal/lang/eval/config_prepare_test.go deleted file mode 100644 index 0b29927d9c..0000000000 --- a/internal/lang/eval/config_prepare_test.go +++ /dev/null @@ -1,391 +0,0 @@ -// Copyright (c) The OpenTofu Authors -// SPDX-License-Identifier: MPL-2.0 -// Copyright (c) 2023 HashiCorp, Inc. -// SPDX-License-Identifier: MPL-2.0 - -package eval - -import ( - "testing" - - "github.com/google/go-cmp/cmp" - "github.com/zclconf/go-cty/cty" - - "github.com/opentofu/opentofu/internal/addrs" - "github.com/opentofu/opentofu/internal/configs" - "github.com/opentofu/opentofu/internal/configs/configschema" - "github.com/opentofu/opentofu/internal/lang/eval/internal/evalglue" - "github.com/opentofu/opentofu/internal/providers" -) - -// NOTE: Unlike many of the _test.go files in this package, this one is in -// "package eval" itself rather than in "package eval_test", because it's -// testing the "prepareToPlan" implementation detail that isn't part of the -// public API. -// -// If you bring test code from other files into here then you'll probably -// need to remove some "eval." prefixes from references to avoid making this -// package import itself. - -func TestPrepare_ephemeralResourceUsers(t *testing.T) { - configInst, diags := NewConfigInstance(t.Context(), &ConfigCall{ - EvalContext: evalglue.EvalContextForTesting(t, &EvalContext{ - Modules: ModulesForTesting(map[addrs.ModuleSourceLocal]*configs.Module{ - addrs.ModuleSourceLocal("."): configs.ModuleFromStringForTesting(t, ` - terraform { - required_providers { - foo = { - source = "test/foo" - } - } - } - ephemeral "foo" "a" { - count = 2 - - name = "a ${count.index}" - } - ephemeral "foo" "b" { - count = 2 - - name = ephemeral.foo.a[count.index].id - } - locals { - # This is intentionally a more complex expression - # to analyze, to prove that we can still chase the - # instance-specific references through it. - # This produces a tuple of two-element tuples with the - # corresponding ids of ephemeral.foo.a and - # ephemeral.foo.b respectively. - together = [ - for i, a in ephemeral.foo.a : - [a.id, ephemeral.foo.b[i].id] - ] - } - resource "foo" "c" { - count = 2 - - # Even indirectly through this projection of values - # from the two ephemeral resources we should correctly - # detect that foo.c instances are correlated with - # ephemeral.foo.a and ephemeral.foo.b instances of - # the same index. - something = local.together[count.index] - - # The above is really just an overly-complicated way of - # writing this: - # - # something = [ - # ephemeral.foo.a[count.index], - # ephemeral.foo.b[count.index], - # ] - } - provider "foo" { - alias = "other" - - name = ephemeral.foo.a[0].name - } - `), - }), - Providers: ProvidersForTesting(map[addrs.Provider]*providers.GetProviderSchemaResponse{ - addrs.MustParseProviderSourceString("test/foo"): { - Provider: providers.Schema{ - Block: &configschema.Block{ - Attributes: map[string]*configschema.Attribute{ - "name": { - Type: cty.String, - Optional: true, - }, - }, - }, - }, - EphemeralResources: map[string]providers.Schema{ - "foo": { - Block: &configschema.Block{ - Attributes: map[string]*configschema.Attribute{ - "name": { - Type: cty.String, - Required: true, - }, - "id": { - Type: cty.String, - Computed: true, - }, - }, - }, - }, - }, - ResourceTypes: map[string]providers.Schema{ - "foo": { - Block: &configschema.Block{ - Attributes: map[string]*configschema.Attribute{ - "something": { - Type: cty.List(cty.String), - Optional: true, - WriteOnly: true, - }, - }, - }, - }, - }, - }, - }), - }), - RootModuleSource: addrs.ModuleSourceLocal("."), - InputValues: InputValuesForTesting(map[string]cty.Value{}), - }) - if diags.HasErrors() { - t.Fatalf("unexpected errors: %s", diags.Err()) - } - - got, diags := configInst.prepareToPlan(t.Context()) - if diags.HasErrors() { - t.Fatalf("unexpected errors: %s", diags.Err()) - } - - fooA := addrs.Resource{ - Mode: addrs.EphemeralResourceMode, - Type: "foo", - Name: "a", - }.Absolute(addrs.RootModuleInstance) - fooB := addrs.Resource{ - Mode: addrs.EphemeralResourceMode, - Type: "foo", - Name: "b", - }.Absolute(addrs.RootModuleInstance) - fooC := addrs.Resource{ - Mode: addrs.ManagedResourceMode, - Type: "foo", - Name: "c", - }.Absolute(addrs.RootModuleInstance) - inst0 := addrs.IntKey(0) - inst1 := addrs.IntKey(1) - providerInstAddr := addrs.AbsProviderConfigCorrect{ - Module: addrs.RootModuleInstance, - Config: addrs.ProviderConfigCorrect{ - Provider: addrs.MustParseProviderSourceString("test/foo"), - }, - }.Instance(addrs.NoKey) - providerOtherInstAddr := addrs.AbsProviderConfigCorrect{ - Module: addrs.RootModuleInstance, - Config: addrs.ProviderConfigCorrect{ - Provider: addrs.MustParseProviderSourceString("test/foo"), - Alias: "other", - }, - }.Instance(addrs.NoKey) - - // The analysis should detect that: - // - ephemeral.foo.a[0] is used by ephemeral.foo.b[0] and foo.c[0], and by the foo.other provider instance - // - ephemeral.foo.a[1] is used by ephemeral.foo.b[1] and foo.c[1] - // - ephemeral.foo.b[0] is used by only foo.c[0] - // - ephemeral.foo.b[1] is used by only foo.c[1] - // In particular, the evaluator should be able to notice that - // only the correlated instance keys have any relationship between - // them, and so e.g. ephemeral.foo.a[0] is NOT used by ephemeral.foo.b[1]. - // - // This level of precision was not possible in the traditional - // "package tofu" language runtime, because it calculated dependencies - // based only on static analysis, but this new evaluator uses dynamic - // analysis. Refer to [configgraph.ContributingResourceInstances] - // to learn more about how that's meant to work, if you're trying to - // debug a regression here that made the analysis less precise. - want := &ResourceRelationships{ - // Note that this field captures _inverse_ dependencies: the values - // are instances that depend on the keys. - // - // The best way to understand this is that the ephemeral resource - // instance identified in an element's key must remain "open" until all - // of the instances identified in the element's value have finished - // planning. - EphemeralResourceUsers: addrs.MakeMap( - addrs.MakeMapElem(fooA.Instance(inst0), EphemeralResourceInstanceUsers{ - ResourceInstances: addrs.MakeSet( - fooB.Instance(inst0), - fooC.Instance(inst0), - ), - ProviderInstances: addrs.MakeSet( - providerOtherInstAddr, - ), - }), - addrs.MakeMapElem(fooA.Instance(inst1), EphemeralResourceInstanceUsers{ - ResourceInstances: addrs.MakeSet( - fooB.Instance(inst1), - fooC.Instance(inst1), - ), - ProviderInstances: addrs.MakeSet[addrs.AbsProviderInstanceCorrect](), - }), - addrs.MakeMapElem(fooB.Instance(inst0), EphemeralResourceInstanceUsers{ - ResourceInstances: addrs.MakeSet( - fooC.Instance(inst0), - ), - ProviderInstances: addrs.MakeSet[addrs.AbsProviderInstanceCorrect](), - }), - addrs.MakeMapElem(fooB.Instance(inst1), EphemeralResourceInstanceUsers{ - ResourceInstances: addrs.MakeSet( - fooC.Instance(inst1), - ), - ProviderInstances: addrs.MakeSet[addrs.AbsProviderInstanceCorrect](), - }), - ), - - // PrepareToPlan also finds the resources that belong to each - // provider instance, which is not the focus of this test but - // are part of the result nonetheless. - ProviderInstanceUsers: addrs.MakeMap( - addrs.MakeMapElem(providerInstAddr, ProviderInstanceUsers{ - ResourceInstances: addrs.MakeSet( - fooA.Instance(inst0), - fooA.Instance(inst1), - fooB.Instance(inst0), - fooB.Instance(inst1), - fooC.Instance(inst0), - fooC.Instance(inst1), - ), - }), - ), - } - - if diff := cmp.Diff(want, got); diff != "" { - t.Error("wrong result\n" + diff) - } -} - -func TestPrepare_crossModuleReferences(t *testing.T) { - configInst, diags := NewConfigInstance(t.Context(), &ConfigCall{ - EvalContext: evalglue.EvalContextForTesting(t, &EvalContext{ - Modules: ModulesForTesting(map[addrs.ModuleSourceLocal]*configs.Module{ - addrs.ModuleSourceLocal("."): configs.ModuleFromStringForTesting(t, ` - module "a" { - source = "./a" - } - module "b" { - source = "./b" - - name = module.a.name - } - `), - addrs.ModuleSourceLocal("./a"): configs.ModuleFromStringForTesting(t, ` - terraform { - required_providers { - foo = { - source = "test/foo" - } - } - } - provider "foo" {} - ephemeral "foo" "a" { - name = "a" - } - output "name" { - value = ephemeral.foo.a.name - } - `), - addrs.ModuleSourceLocal("./b"): configs.ModuleFromStringForTesting(t, ` - terraform { - required_providers { - foo = { - source = "test/foo" - } - } - } - provider "foo" {} - variable "name" { - type = string - ephemeral = true - } - resource "foo" "b" { - name = var.name - } - `), - }), - Providers: ProvidersForTesting(map[addrs.Provider]*providers.GetProviderSchemaResponse{ - addrs.MustParseProviderSourceString("test/foo"): { - Provider: providers.Schema{ - Block: &configschema.Block{}, - }, - EphemeralResources: map[string]providers.Schema{ - "foo": { - Block: &configschema.Block{ - Attributes: map[string]*configschema.Attribute{ - "name": { - Type: cty.String, - Required: true, - }, - }, - }, - }, - }, - ResourceTypes: map[string]providers.Schema{ - "foo": { - Block: &configschema.Block{ - Attributes: map[string]*configschema.Attribute{ - "name": { - Type: cty.String, - Optional: true, - WriteOnly: true, - }, - }, - }, - }, - }, - }, - }), - }), - RootModuleSource: addrs.ModuleSourceLocal("."), - InputValues: InputValuesForTesting(map[string]cty.Value{}), - }) - if diags.HasErrors() { - t.Fatalf("unexpected errors: %s", diags.Err()) - } - - got, diags := configInst.prepareToPlan(t.Context()) - if diags.HasErrors() { - t.Fatalf("unexpected errors: %s", diags.Err()) - } - - fooA := addrs.Resource{ - Mode: addrs.EphemeralResourceMode, - Type: "foo", - Name: "a", - }.Absolute(addrs.RootModuleInstance.Child("a", addrs.NoKey)) - fooB := addrs.Resource{ - Mode: addrs.ManagedResourceMode, - Type: "foo", - Name: "b", - }.Absolute(addrs.RootModuleInstance.Child("b", addrs.NoKey)) - providerInstAddr := addrs.AbsProviderConfigCorrect{ - Module: addrs.RootModuleInstance, - Config: addrs.ProviderConfigCorrect{ - Provider: addrs.MustParseProviderSourceString("test/foo"), - }, - }.Instance(addrs.NoKey) - - // The analyzer should detect that foo.b in module.b depends on - // ephemeral.foo.a in module.a even though they are declared in - // different modules. - want := &ResourceRelationships{ - EphemeralResourceUsers: addrs.MakeMap( - addrs.MakeMapElem(fooA.Instance(addrs.NoKey), EphemeralResourceInstanceUsers{ - ResourceInstances: addrs.MakeSet( - fooB.Instance(addrs.NoKey), - ), - ProviderInstances: addrs.MakeSet[addrs.AbsProviderInstanceCorrect](), - }), - ), - - // PrepareToPlan also finds the resources that belong to each - // provider instance, which is not the focus of this test but - // are part of the result nonetheless. - ProviderInstanceUsers: addrs.MakeMap( - addrs.MakeMapElem(providerInstAddr, ProviderInstanceUsers{ - ResourceInstances: addrs.MakeSet( - fooA.Instance(addrs.NoKey), - fooB.Instance(addrs.NoKey), - ), - }), - ), - } - - if diff := cmp.Diff(want, got); diff != "" { - t.Error("wrong result\n" + diff) - } -} diff --git a/internal/shared/ephemeral_resource.go b/internal/shared/ephemeral_resource.go new file mode 100644 index 0000000000..fee7a327a8 --- /dev/null +++ b/internal/shared/ephemeral_resource.go @@ -0,0 +1,234 @@ +// Copyright (c) The OpenTofu Authors +// SPDX-License-Identifier: MPL-2.0 +// Copyright (c) 2023 HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package shared + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/hashicorp/hcl/v2" + "github.com/opentofu/opentofu/internal/addrs" + "github.com/opentofu/opentofu/internal/configs/configschema" + "github.com/opentofu/opentofu/internal/providers" + "github.com/opentofu/opentofu/internal/tfdiags" + "github.com/zclconf/go-cty/cty" +) + +type EphemeralResourceHooks struct { + PreOpen func(addrs.AbsResourceInstance) + PostOpen func(addrs.AbsResourceInstance, tfdiags.Diagnostics) + PreRenew func(addrs.AbsResourceInstance) + PostRenew func(addrs.AbsResourceInstance, tfdiags.Diagnostics) + PreClose func(addrs.AbsResourceInstance) + PostClose func(addrs.AbsResourceInstance, tfdiags.Diagnostics) +} + +type EphemeralCloseFunc func(context.Context) tfdiags.Diagnostics + +func OpenEphemeralResourceInstance( + ctx context.Context, + // TODO once we remove the old engine, this can be condensed using the new engine types + addr addrs.AbsResourceInstance, + schema *configschema.Block, + providerAddr addrs.AbsProviderInstanceCorrect, + provider providers.Interface, + configVal cty.Value, + hooks EphemeralResourceHooks, +) (cty.Value, EphemeralCloseFunc, tfdiags.Diagnostics) { + var newVal cty.Value + var diags tfdiags.Diagnostics + + // Unmark before sending to provider, will re-mark before returning + configVal, pvm := configVal.UnmarkDeepWithPaths() + + log.Printf("[TRACE] OpenEphemeralResourceInstance: Re-validating config for %s", addr) + validateResp := provider.ValidateEphemeralConfig( + ctx, + providers.ValidateEphemeralConfigRequest{ + TypeName: addr.ContainingResource().Resource.Type, + Config: configVal, + }, + ) + diags = diags.Append(validateResp.Diagnostics) + if diags.HasErrors() { + return newVal, nil, diags + } + + // If we get down here then our configuration is complete and we're ready + // to actually call the provider to open the ephemeral resource. + log.Printf("[TRACE] OpenEphemeralResourceInstance: %s configuration is complete, so calling the provider", addr) + + if hooks.PreOpen != nil { + hooks.PreOpen(addr) + } + + openReq := providers.OpenEphemeralResourceRequest{ + TypeName: addr.ContainingResource().Resource.Type, + Config: configVal, + } + openResp := provider.OpenEphemeralResource(ctx, openReq) + diags = diags.Append(openResp.Diagnostics) + if diags.HasErrors() { + return newVal, nil, diags + } + + newVal = openResp.Result + + // Encapsulate validation for easier close handling + func() { + for _, err := range newVal.Type().TestConformance(schema.ImpliedType()) { + diags = diags.Append(tfdiags.Sourceless( + tfdiags.Error, + "Provider produced invalid object", + fmt.Sprintf( + "Provider %q produced an invalid value for %s.\n\nThis is a bug in the provider, which should be reported in the provider's own issue tracker.", + providerAddr, tfdiags.FormatErrorPrefixed(err, addr.String()), + ), + )) + } + if diags.HasErrors() { + return + } + + if newVal.IsNull() { + diags = diags.Append(tfdiags.Sourceless( + tfdiags.Error, + "Provider produced null object", + fmt.Sprintf( + "Provider %q produced a null value for %s.\n\nThis is a bug in the provider, which should be reported in the provider's own issue tracker.", + providerAddr, addr, + ), + )) + return + } + + if !newVal.IsNull() && !newVal.IsWhollyKnown() { + diags = diags.Append(tfdiags.Sourceless( + tfdiags.Error, + "Provider produced invalid object", + fmt.Sprintf( + "Provider %q produced a value for %s that is not wholly known.\n\nThis is a bug in the provider, which should be reported in the provider's own issue tracker.", + providerAddr, addr, + ), + )) + return + } + }() + + if diags.HasErrors() { + // We have an open ephemeral resource, but don't plan to use it due to validation errors + // It needs to be closed before we can return + + closReq := providers.CloseEphemeralResourceRequest{ + TypeName: addr.Resource.Resource.Type, + Private: openResp.Private, + } + closeResp := provider.CloseEphemeralResource(ctx, closReq) + diags = diags.Append(closeResp.Diagnostics) + + return newVal, nil, diags + } + + // TODO see if this conflicts with anything in the new engine? + if len(pvm) > 0 { + newVal = newVal.MarkWithPaths(pvm) + } + + if hooks.PostOpen != nil { + hooks.PostOpen(addr, diags) + } + + // Initialize the closing channel and the channel that sends diagnostics back to the close caller. + closeCh := make(chan context.Context, 1) + diagsCh := make(chan tfdiags.Diagnostics, 1) + go func() { + var diags tfdiags.Diagnostics + renewAt := openResp.RenewAt + privateData := openResp.Private + + closeCtx := ctx + + // We have two exit paths that should take the same route + func() { + for { + // Select on nil chan will block until other case close or done + var renewAtTimer chan time.Time + if renewAt != nil { + time.After(time.Until(*renewAt)) + } + + select { + case <-renewAtTimer: + if hooks.PreRenew != nil { + hooks.PreRenew(addr) + } + + renewReq := providers.RenewEphemeralResourceRequest{ + TypeName: addr.Resource.Resource.Type, + Private: privateData, + } + renewResp := provider.RenewEphemeralResource(ctx, renewReq) + diags = diags.Append(renewResp.Diagnostics) + // TODO consider what happens if renew fails, do we still want to update private? + renewAt = renewResp.RenewAt + + if hooks.PostRenew != nil { + hooks.PostRenew(addr, diags) + } + privateData = renewResp.Private + case closeCtx = <-closeCh: + return + case <-ctx.Done(): + // Even though the context is "Done" we still want to execute the close operation + closeCtx = context.WithoutCancel(closeCtx) + return + } + } + }() + + if hooks.PreClose != nil { + hooks.PreClose(addr) + } + + closReq := providers.CloseEphemeralResourceRequest{ + TypeName: addr.Resource.Resource.Type, + Private: privateData, + } + closeResp := provider.CloseEphemeralResource(closeCtx, closReq) + diags = diags.Append(closeResp.Diagnostics) + + if hooks.PostClose != nil { + hooks.PostClose(addr, diags) + } + + diagsCh <- diags + }() + + closeFunc := func(ctx context.Context) tfdiags.Diagnostics { + closeCh <- ctx + close(closeCh) + defer func() { + close(diagsCh) + }() + + timeout := 10 * time.Second + select { + case d := <-diagsCh: + return d + case <-time.After(timeout): + return tfdiags.Diagnostics{}.Append(&hcl.Diagnostic{ + Severity: hcl.DiagError, + Summary: "Closing ephemeral resource timed out", + Detail: fmt.Sprintf("The ephemeral resource %q timed out on closing after %s", addr.String(), timeout), + //TODO Subject: n.Config.DeclRange.Ptr(), + }) + } + } + + return newVal, closeFunc, diags +} diff --git a/internal/tofu/node_resource_abstract_instance.go b/internal/tofu/node_resource_abstract_instance.go index e721bc0abd..6faf2b51b8 100644 --- a/internal/tofu/node_resource_abstract_instance.go +++ b/internal/tofu/node_resource_abstract_instance.go @@ -11,7 +11,6 @@ import ( "log" "strings" "sync/atomic" - "time" "github.com/hashicorp/hcl/v2" "github.com/zclconf/go-cty/cty" @@ -19,7 +18,7 @@ import ( "github.com/opentofu/opentofu/internal/addrs" "github.com/opentofu/opentofu/internal/checks" - "github.com/opentofu/opentofu/internal/communicator/shared" + commShared "github.com/opentofu/opentofu/internal/communicator/shared" "github.com/opentofu/opentofu/internal/configs" "github.com/opentofu/opentofu/internal/configs/configschema" "github.com/opentofu/opentofu/internal/encryption" @@ -31,6 +30,7 @@ import ( "github.com/opentofu/opentofu/internal/plans/objchange" "github.com/opentofu/opentofu/internal/providers" "github.com/opentofu/opentofu/internal/provisioners" + "github.com/opentofu/opentofu/internal/shared" "github.com/opentofu/opentofu/internal/states" "github.com/opentofu/opentofu/internal/tfdiags" ) @@ -106,9 +106,8 @@ type NodeAbstractResourceInstance struct { // start so calls to NodeAbstractResourceInstance.Close can return no diagnostics whatsoever. // A common reason for which the renewal goroutine can be skipped from being created is when the ephemeral // resource is deferred for the apply phase. - closeCh chan struct{} - ephemeralDiags chan tfdiags.Diagnostics - renewStarted atomic.Bool + ephemeralCloseFn func() tfdiags.Diagnostics + renewStarted atomic.Bool } // NewNodeAbstractResourceInstance creates an abstract resource instance graph @@ -2056,121 +2055,77 @@ func (n *NodeAbstractResourceInstance) providerMetas(ctx context.Context, evalCt func (n *NodeAbstractResourceInstance) openEphemeralResource(ctx context.Context, evalCtx EvalContext, configVal cty.Value) (cty.Value, tfdiags.Diagnostics) { var diags tfdiags.Diagnostics - var newVal cty.Value - config := *n.Config provider, providerSchema, err := n.getProvider(ctx, evalCtx) diags = diags.Append(err) if diags.HasErrors() { - return newVal, diags + return cty.NilVal, diags } schema, _ := providerSchema.SchemaForResourceAddr(n.Addr.ContainingResource().Resource) if schema == nil { // Should be caught during validation, so we don't bother with a pretty error here diags = diags.Append(fmt.Errorf("provider %q does not support ephemeral resource %q", n.ResolvedProvider.ProviderConfig, n.Addr.ContainingResource().Resource.Type)) - return newVal, diags + return cty.NilVal, diags } - // Unmark before sending to provider, will re-mark before returning - var pvm []cty.PathValueMarks - configVal, pvm = configVal.UnmarkDeepWithPaths() - - log.Printf("[TRACE] openEphemeralResource: Re-validating config for %s", n.Addr) - validateResp := provider.ValidateEphemeralConfig( + newVal, closeFn, openDiags := shared.OpenEphemeralResourceInstance( ctx, - providers.ValidateEphemeralConfigRequest{ - TypeName: n.Addr.ContainingResource().Resource.Type, - Config: configVal, + n.Addr, + schema, + n.ResolvedProvider.ProviderConfig.Correct().Instance(n.ResolvedProviderKey), + provider, + configVal, + shared.EphemeralResourceHooks{ + PreOpen: func(addr addrs.AbsResourceInstance) { + _ = evalCtx.Hook(func(h Hook) (HookAction, error) { + return h.PreOpen(addr) + }) + }, + PostOpen: func(addr addrs.AbsResourceInstance, diags tfdiags.Diagnostics) { + _ = evalCtx.Hook(func(h Hook) (HookAction, error) { + return h.PostOpen(addr, diags.Err()) + }) + }, + PreRenew: func(addr addrs.AbsResourceInstance) { + _ = evalCtx.Hook(func(h Hook) (HookAction, error) { + return h.PreRenew(addr) + }) + }, + PostRenew: func(addr addrs.AbsResourceInstance, diags tfdiags.Diagnostics) { + _ = evalCtx.Hook(func(h Hook) (HookAction, error) { + return h.PostRenew(addr, diags.Err()) + }) + }, + PreClose: func(addr addrs.AbsResourceInstance) { + _ = evalCtx.Hook(func(h Hook) (HookAction, error) { + return h.PreClose(addr) + }) + }, + PostClose: func(addr addrs.AbsResourceInstance, diags tfdiags.Diagnostics) { + _ = evalCtx.Hook(func(h Hook) (HookAction, error) { + return h.PostClose(addr, diags.Err()) + }) + }, }, ) - diags = diags.Append(validateResp.Diagnostics.InConfigBody(config.Config, n.Addr.String())) + + diags = diags.Append(openDiags.InConfigBody(config.Config, n.Addr.String())) if diags.HasErrors() { return newVal, diags } - // If we get down here then our configuration is complete and we're ready - // to actually call the provider to open the ephemeral resource. - log.Printf("[TRACE] openEphemeralResource: %s configuration is complete, so calling the provider", n.Addr) - - diags = diags.Append(evalCtx.Hook(func(h Hook) (HookAction, error) { - return h.PreOpen(n.Addr) - })) - if diags.HasErrors() { - return newVal, diags + n.ephemeralCloseFn = func() tfdiags.Diagnostics { + // We use the same context for close here, not sure if we want to consider using the context for the close node instead + return closeFn(ctx).InConfigBody(config.Config, n.Addr.String()) } - req := providers.OpenEphemeralResourceRequest{ - TypeName: n.Addr.ContainingResource().Resource.Type, - Config: configVal, - } - resp := provider.OpenEphemeralResource(ctx, req) - diags = diags.Append(resp.Diagnostics.InConfigBody(config.Config, n.Addr.String())) - if diags.HasErrors() { - return newVal, diags - } - - newVal = resp.Result - - for _, err := range newVal.Type().TestConformance(schema.ImpliedType()) { - diags = diags.Append(tfdiags.Sourceless( - tfdiags.Error, - "Provider produced invalid object", - fmt.Sprintf( - "Provider %q produced an invalid value for %s.\n\nThis is a bug in the provider, which should be reported in the provider's own issue tracker.", - n.ResolvedProvider.ProviderConfig.InstanceString(n.ResolvedProviderKey), tfdiags.FormatErrorPrefixed(err, n.Addr.String()), - ), - )) - } - if diags.HasErrors() { - return newVal, diags - } - - if newVal.IsNull() { - diags = diags.Append(tfdiags.Sourceless( - tfdiags.Error, - "Provider produced null object", - fmt.Sprintf( - "Provider %q produced a null value for %s.\n\nThis is a bug in the provider, which should be reported in the provider's own issue tracker.", - n.ResolvedProvider.ProviderConfig.InstanceString(n.ResolvedProviderKey), n.Addr, - ), - )) - return newVal, diags - } - - if !newVal.IsNull() && !newVal.IsWhollyKnown() { - diags = diags.Append(tfdiags.Sourceless( - tfdiags.Error, - "Provider produced invalid object", - fmt.Sprintf( - "Provider %q produced a value for %s that is not wholly known.\n\nThis is a bug in the provider, which should be reported in the provider's own issue tracker.", - n.ResolvedProvider.ProviderConfig.InstanceString(n.ResolvedProviderKey), n.Addr, - ), - )) - return newVal, diags - } - - if len(pvm) > 0 { - newVal = newVal.MarkWithPaths(pvm) - } - diags = diags.Append(evalCtx.Hook(func(h Hook) (HookAction, error) { - return h.PostOpen(n.Addr, diags.Err()) - })) - - // Initialize the closing channel and the channel that sends diagnostics back to the - // NodeAbstractResourceInstance.Close caller. - n.closeCh = make(chan struct{}, 1) - n.ephemeralDiags = make(chan tfdiags.Diagnostics, 1) // Due to the go scheduler inner works, the goroutine spawned below can be actually scheduled // later than the execution of the nodeCloseableResource graph node. // Therefore, we want to mark the renewal process as started before the goroutine spawning to be sure // that the execution of nodeCloseableResource will block on the diagnostics reported by the // goroutine below. n.renewStarted.Store(true) - // The renewer is taking care of calling provider.Renew if resp.RenewAt != nil. - // But if resp.RenewAt == nil, renewer holds only the resp.Private that will be used later - // when calling provider.CloseEphemeralResource. - go n.startEphemeralRenew(ctx, evalCtx, provider, resp.RenewAt, resp.Private) return newVal, diags } @@ -2724,11 +2679,11 @@ func (n *NodeAbstractResourceInstance) applyProvisioners(ctx context.Context, ev } // start with an empty connInfo - connInfo := cty.NullVal(shared.ConnectionBlockSupersetSchema.ImpliedType()) + connInfo := cty.NullVal(commShared.ConnectionBlockSupersetSchema.ImpliedType()) if connBody != nil { var connInfoDiags tfdiags.Diagnostics - connInfo, connInfoDiags = evalScope(ctx, evalCtx, connBody, self, shared.ConnectionBlockSupersetSchema) + connInfo, connInfoDiags = evalScope(ctx, evalCtx, connBody, self, commShared.ConnectionBlockSupersetSchema) diags = diags.Append(connInfoDiags) if diags.HasErrors() { return diags @@ -3427,74 +3382,6 @@ func (n *NodeAbstractResourceInstance) planEphemeralResource(ctx context.Context return plannedChange, plannedNewState, keyData, diags } -func (n *NodeAbstractResourceInstance) startEphemeralRenew(ctx context.Context, evalContext EvalContext, provider providers.Interface, renewAt *time.Time, privateData []byte) { - if n.Addr.Resource.Resource.Mode != addrs.EphemeralResourceMode { - panic("renewal process cannot be started for resources other than ephemeral ones. This is an OpenTofu issue, please report it") - } - privateData, diags := n.renewEphemeral(ctx, evalContext, provider, renewAt, privateData) - // wait for the close signal. This is like this because the renewEphemeral can return right away if the renewAt is nil. - // But the close of the ephemeral should happen only when the graph walk is reaching the execution of the closing - // ephemeral resource node. - <-n.closeCh - diags = diags.Append(n.closeEphemeralResource(ctx, evalContext, provider, privateData)) - n.ephemeralDiags <- diags -} - -func (n *NodeAbstractResourceInstance) closeEphemeralResource(ctx context.Context, evalContext EvalContext, provider providers.Interface, privateData []byte) (diags tfdiags.Diagnostics) { - req := providers.CloseEphemeralResourceRequest{ - TypeName: n.Addr.Resource.Resource.Type, - Private: privateData, - } - - // We are using cty.EmptyObject for the PreApply and PostApply because the prior state - // and the new planned state does not matter in ephemeral resources context, especially - // in the context of the close operation. - diags = diags.Append(evalContext.Hook(func(h Hook) (HookAction, error) { - return h.PreClose(n.Addr) - })) - resp := provider.CloseEphemeralResource(ctx, req) - diags = diags.Append(resp.Diagnostics) - - diags = diags.Append(evalContext.Hook(func(h Hook) (HookAction, error) { - return h.PostClose(n.Addr, diags.Err()) - })) - return diags.Append(diags) -} - -// renewEphemeral is meant to be called into a goroutine. This method listens on ctx.Done and n.closeCh for ending the job and -// to return the data. -func (n *NodeAbstractResourceInstance) renewEphemeral(ctx context.Context, evalContext EvalContext, provider providers.Interface, renewAt *time.Time, privateData []byte) ([]byte, tfdiags.Diagnostics) { - var diags tfdiags.Diagnostics - for { - if renewAt == nil { - return privateData, diags - } - select { - case <-time.After(time.Until(*renewAt)): - case <-n.closeCh: - return privateData, diags - case <-ctx.Done(): - return privateData, diags - } - diags = diags.Append(evalContext.Hook(func(h Hook) (HookAction, error) { - // We are using cty.EmptyObject here because the prior state and the new planned state does not matter - // in ephemeral resources context, especially in the context of the renew operation. - return h.PreRenew(n.Addr) - })) - req := providers.RenewEphemeralResourceRequest{ - TypeName: n.Addr.Resource.Resource.Type, - Private: privateData, - } - resp := provider.RenewEphemeralResource(ctx, req) - diags = diags.Append(evalContext.Hook(func(h Hook) (HookAction, error) { - return h.PostRenew(n.Addr, diags.Err()) - })) - diags = diags.Append(resp.Diagnostics) - renewAt = resp.RenewAt - privateData = resp.Private - } -} - // deferEphemeralResource is a helper function that builds a change and a state object by using a // partial value and is announcing the deferral of the ephemeral resource. func (n *NodeAbstractResourceInstance) deferEphemeralResource(evalCtx EvalContext, schema *configschema.Block, priorVal cty.Value, configVal cty.Value, reason string) ( @@ -3544,21 +3431,7 @@ func (n *NodeAbstractResourceInstance) Close() tfdiags.Diagnostics { // If the ephemeral resource has been deferred, this method needs to return immediately. return nil } - defer func() { - close(n.ephemeralDiags) - n.renewStarted.Store(false) - }() - close(n.closeCh) - timeout := 10 * time.Second - select { - case d := <-n.ephemeralDiags: - return d - case <-time.After(timeout): - return tfdiags.Diagnostics{}.Append(&hcl.Diagnostic{ - Severity: hcl.DiagError, - Summary: "Closing ephemeral resource timed out", - Detail: fmt.Sprintf("The ephemeral resource %q timed out on closing after %s", n.Addr.String(), timeout), - Subject: n.Config.DeclRange.Ptr(), - }) - } + defer n.renewStarted.Store(false) + + return n.ephemeralCloseFn() }