diff --git a/internal/engine/internal/execgraph/compiled.go b/internal/engine/internal/execgraph/compiled.go index 875c3a9388..4b1178712c 100644 --- a/internal/engine/internal/execgraph/compiled.go +++ b/internal/engine/internal/execgraph/compiled.go @@ -18,7 +18,7 @@ import ( ) type CompiledGraph struct { - // ops is the main essence of a compiled graph: a series of functions + // steps is the main essence of a compiled graph: a series of functions // that we'll run all at once, one goroutine each, and then wait until // they've all returned something. // @@ -27,7 +27,7 @@ type CompiledGraph struct { // compiler to arrange for the necessary data flow while it's building // these compiled operations. Execution is complete once all of these // functions have returned. - ops []anyCompiledOperation + steps []nodeExecuteRaw // resourceInstanceValues provides a function for each resource instance // that was registered as a "sink" during graph building which blocks @@ -65,10 +65,10 @@ func (c *CompiledGraph) Execute(ctx context.Context) tfdiags.Diagnostics { var diagsMu sync.Mutex var wg sync.WaitGroup - wg.Add(len(c.ops)) - for _, op := range c.ops { + wg.Add(len(c.steps)) + for _, op := range c.steps { wg.Go(func() { - opDiags := op(grapheval.ContextWithNewWorker(ctx)) + _, _, opDiags := op(grapheval.ContextWithNewWorker(ctx)) diagsMu.Lock() diags = diags.Append(opDiags) diagsMu.Unlock() diff --git a/internal/engine/internal/execgraph/compiler.go b/internal/engine/internal/execgraph/compiler.go index b9181d75fa..2d2d31434f 100644 --- a/internal/engine/internal/execgraph/compiler.go +++ b/internal/engine/internal/execgraph/compiler.go @@ -7,22 +7,53 @@ package execgraph import ( "context" + "fmt" + "iter" + "strings" + "sync" "github.com/apparentlymart/go-workgraph/workgraph" "github.com/zclconf/go-cty/cty" "github.com/opentofu/opentofu/internal/addrs" + "github.com/opentofu/opentofu/internal/lang/eval" + "github.com/opentofu/opentofu/internal/lang/grapheval" + "github.com/opentofu/opentofu/internal/states" + "github.com/opentofu/opentofu/internal/tfdiags" ) -func (g *Graph) Compile() *CompiledGraph { +// Compile produces a compiled version of the graph which will, once executed, +// use the given arguments to interact with other parts of the broader system. +// +// TODO: This currently takes a prior state snapshot using our current models, +// but the state model we have probably isn't the best for this new execution +// approach. +func (g *Graph) Compile(oracle *eval.ApplyOracle, evalCtx *eval.EvalContext, priorState *states.SyncState) (*CompiledGraph, tfdiags.Diagnostics) { ret := &CompiledGraph{ - ops: make([]anyCompiledOperation, len(g.ops)), resourceInstanceValues: addrs.MakeMap[addrs.AbsResourceInstance, func(ctx context.Context) cty.Value](), cleanupWorker: workgraph.NewWorker(), } c := &compiler{ - sourceGraph: g, - compiledGraph: ret, + sourceGraph: g, + compiledGraph: ret, + oracle: oracle, + opResolvers: make([]workgraph.Resolver[nodeResultRaw], len(g.ops)), + opResults: make([]workgraph.Promise[nodeResultRaw], len(g.ops)), + desiredStateFuncs: make([]nodeExecuteRaw, len(g.desiredStateRefs)), + providerInstConfigFuncs: make([]nodeExecuteRaw, len(g.providerInstConfigRefs)), + } + // We'll prepopulate all of the operation promises, and then the compiler + // will arrange for them to each get wired where they need to be. + for i := range c.opResults { + // The "cleanupWorker" is initially the responsible worker, but + // the compiler arranges for responsibility to transfer to per-operation + // workers created dynamically as the graph is executed, so in the + // happy path cleanupWorker should end up responsible for nothing + // at the end. (If that isn't true then all of the remaining requests + // will force-fail when the compiled graph gets garbage collected.) + resolver, promise := workgraph.NewRequest[nodeResultRaw](ret.cleanupWorker) + c.opResolvers[i] = resolver + c.opResults[i] = promise } return c.Compile() } @@ -35,9 +66,432 @@ func (g *Graph) Compile() *CompiledGraph { type compiler struct { sourceGraph *Graph compiledGraph *CompiledGraph + oracle *eval.ApplyOracle + evalCtx *eval.EvalContext + priorState *states.SyncState + + // opResolvers and opResults track our requests for our operation results, + // each of which should be resolved by one of the "steps" in the compiled + // graph so that the data can then propagate between nodes. + // + // The indices of this slice correspond to the indices of sourceGraph.ops. + // The promises in here are initially owned by compiledGraph.cleanupWorker, + // but responsibility for them is transferred to the worker for each + // operation's "step" in the compiled graph once they begin executing. + opResolvers []workgraph.Resolver[nodeResultRaw] + opResults []workgraph.Promise[nodeResultRaw] + + // Some of our node types cause fallible side-effects and so we memoize + // what we returned to ensure that the action only runs once and then + // its results are distributed to all referrers. + // + // The indices of each of these correlate with the matching slices in + // sourceGraph. + desiredStateFuncs []nodeExecuteRaw + providerInstConfigFuncs []nodeExecuteRaw + + // diags accumulates any problems we detect during the compilation process, + // which are ultimately returned by [compiler.Compile] so that the caller + // knows not to even try executing the result graph. + diags tfdiags.Diagnostics } -func (c *compiler) Compile() *CompiledGraph { - // TODO: Implement - return c.compiledGraph +func (c *compiler) Compile() (*CompiledGraph, tfdiags.Diagnostics) { + // Although the _execution_ of the compiled graph runs all of the steps + // concurrently, the compiler itself is intentionally written as + // sequential code in the hope of that making it easier to understand + // and maintain, since it's inevitably quite self-referential as it + // turns the source graph into a series of executable functions. + + // The operations are the main part of the graph we actually care about + // because they represent externally-visible side-effects. We'll use + // those as our main vehicle for compilation, producing compiled versions + // of other nodes as we go along only as needed to satisfy the operations. + opResolvers := c.opResolvers + for opIdx, opDesc := range c.sourceGraph.ops { + operands := newCompilerOperands(c.compileOperands(opDesc.operands)) + var compileFunc func(operands *compilerOperands) nodeExecuteRaw + switch opDesc.opCode { + case opOpenProvider: + compileFunc = c.compileOpOpenProvider + default: + c.diags = c.diags.Append(tfdiags.Sourceless( + tfdiags.Error, + "Unsupported opcode in execution graph", + fmt.Sprintf("Execution graph includes opcode %s, but the compiler doesn't know how to handle it. This is a bug in OpenTofu.", opDesc.opCode), + )) + continue + } + // The main execution function deals with the opCode-specific behavior, + // but we need to wrap it in some general code that arranges for + // the operation results to propagate through the graph using the + // promises set up in [Graph.Compile]. + mainExec := compileFunc(operands) + graphExec := func(parentCtx context.Context) (any, bool, tfdiags.Diagnostics) { + // Each operation's execution must have its own workgraph worker + // that's responsible for resolving the associated promise, since + // that allows us to detect if operations try to depend on their + // own results, or if the implementation panics and thus causes + // this worker to get garbage-collected. + resolver := opResolvers[opIdx] + worker := workgraph.NewWorker(resolver) + ctx := grapheval.ContextWithWorker(parentCtx, worker) + ret, ok, diags := mainExec(ctx) + // Resolving the promise might allow dependent operations to begin. + resolver.ReportSuccess(worker, nodeResultRaw{ + Value: ret, + CanContinue: ok, + Diagnostics: diags, + }) + return ret, ok, diags + } + c.compiledGraph.steps = append(c.compiledGraph.steps, graphExec) + } + if c.diags.HasErrors() { + // Don't expose the likely-invalid compiled graph, then. + return nil, c.diags + } + + // Before we return we also need to fill in the resource instance values + // so that it's possible to get the information needed to satisfy the + // evaluation system. + for _, elem := range c.sourceGraph.resourceInstanceResults.Elems { + instAddr := elem.Key + ref := elem.Value + execFunc := c.compileResultRef(ref) + c.compiledGraph.resourceInstanceValues.Put(instAddr, func(ctx context.Context) cty.Value { + rawResult, ok, _ := execFunc(ctx) + if !ok { + return cty.DynamicVal + } + finalStateObj := rawResult.(*states.ResourceInstanceObject) + return finalStateObj.Value + }) + } + + return c.compiledGraph, c.diags +} + +func (c *compiler) compileOperands(refs []AnyResultRef) iter.Seq2[AnyResultRef, nodeExecuteRaw] { + return func(yield func(AnyResultRef, nodeExecuteRaw) bool) { + for _, ref := range refs { + exec := c.compileResultRef(ref) + if !yield(ref, exec) { + return + } + } + } +} + +// compileResultRef transforms a result reference into a function that blocks +// until the associated result is ready and then returns that result as a +// value of type [any], which the caller could then cast into the concrete +// type that the result was expected to produce. +func (c *compiler) compileResultRef(ref AnyResultRef) nodeExecuteRaw { + // The closures we return should only capture primitive values and + // pointers to as small a part of the compiler's state as possible, so + // that the overall compiler object can be garbage-collected once + // compilation is complete. + oracle := c.oracle + + // For any of the cases that return functions that cause side-effects that + // can potentially fail we must use a "once" wrapper to ensure that the + // execution is coalesced for all callers, and make sure it's included + // in the "steps" of the compiled graph so that any diagnostics will be + // recorded. + + const errSummary = "Invalid execution graph" + switch ref := ref.(type) { + case valueResultRef: + vals := c.sourceGraph.constantVals + index := ref.index + return func(_ context.Context) (any, bool, tfdiags.Diagnostics) { + return vals[index], true, nil + } + case providerAddrResultRef: + providerAddrs := c.sourceGraph.providerAddrs + index := ref.index + return func(_ context.Context) (any, bool, tfdiags.Diagnostics) { + return providerAddrs[index], true, nil + } + case desiredResourceInstanceResultRef: + resourceInstAddrs := c.sourceGraph.desiredStateRefs + index := ref.index + if existing := c.desiredStateFuncs[index]; existing != nil { + return existing + } + c.desiredStateFuncs[index] = nodeExecuteRawOnce(func(ctx context.Context) (any, bool, tfdiags.Diagnostics) { + var diags tfdiags.Diagnostics + desired := oracle.DesiredResourceInstance(ctx, resourceInstAddrs[index]) + if desired == nil { + // If we get here then it suggests a bug in the planning engine, + // because it should not include a node referring to a resource + // instance that is not part of the desired state. + diags = diags.Append(tfdiags.Sourceless( + tfdiags.Error, + errSummary, + fmt.Sprintf("The execution graph expects desired state for %s, but the evaluation system does not consider this resource instance to be \"desired\". This is a bug in OpenTofu.", resourceInstAddrs[index]), + )) + return nil, false, diags + } + return desired, true, diags + }) + c.compiledGraph.steps = append(c.compiledGraph.steps, c.desiredStateFuncs[index]) + return c.desiredStateFuncs[index] + case resourceInstancePriorStateResultRef: + priorState := c.priorState + priorStateRefs := c.sourceGraph.priorStateRefs + index := ref.index + return func(ctx context.Context) (any, bool, tfdiags.Diagnostics) { + var diags tfdiags.Diagnostics + priorStateRef := priorStateRefs[index] + var gen states.Generation + if priorStateRef.DeposedKey == states.NotDeposed { + gen = states.CurrentGen + } else { + gen = priorStateRef.DeposedKey + } + obj := priorState.ResourceInstanceObject(priorStateRef.ResourceInstance, gen) + if obj == nil { + // If we get here then it suggests a bug in the planning engine, + // because it should not include a node referring to a resource + // instance object that is not part of the prior state. (An + // object being created should have its prior state set to a + // constant nil, without referring to prior state.) + name := priorStateRef.ResourceInstance.String() + if priorStateRef.DeposedKey != states.NotDeposed { + name += fmt.Sprintf("deposed object %s", priorStateRef.DeposedKey) + } + diags = diags.Append(tfdiags.Sourceless( + tfdiags.Error, + errSummary, + fmt.Sprintf("The execution graph expects prior state for %s, but no such object exists in the state. This is a bug in OpenTofu.", name), + )) + return nil, false, diags + } + return obj, true, diags + } + case providerInstanceConfigResultRef: + providerInstConfigRefs := c.sourceGraph.providerInstConfigRefs + index := ref.index + if existing := c.providerInstConfigFuncs[index]; existing != nil { + return existing + } + c.providerInstConfigFuncs[index] = nodeExecuteRawOnce(func(ctx context.Context) (any, bool, tfdiags.Diagnostics) { + var diags tfdiags.Diagnostics + configVal := oracle.ProviderInstanceConfig(ctx, providerInstConfigRefs[index]) + if configVal == cty.NilVal { + // If we get here then it suggests a bug in the planning engine, + // because it should not include a node referring to a provider + // instance that is not present in the configuration. + diags = diags.Append(tfdiags.Sourceless( + tfdiags.Error, + errSummary, + fmt.Sprintf("The execution graph expects configuration for %s, but the evaluation system does not know about that provider instance. This is a bug in OpenTofu.", providerInstConfigRefs[index]), + )) + return nil, false, diags + } + return configVal, true, diags + }) + c.compiledGraph.steps = append(c.compiledGraph.steps, c.desiredStateFuncs[index]) + return c.providerInstConfigFuncs[index] + case anyOperationResultRef: + // Operations have different result types depending on their opcodes, + // but at this point we just represent everything as "any" and expect + // that the downstream operations that rely on these results will + // type-assert them dynamically as needed. + opResults := c.opResults + opResolvers := c.opResolvers + index := ref.operationResultIndex() + return func(ctx context.Context) (any, bool, tfdiags.Diagnostics) { + var diags tfdiags.Diagnostics + promise := opResults[index] + resultRaw, err := promise.Await(grapheval.WorkerFromContext(ctx)) + if err != nil { + // An error here always means that the workgraph library has + // detected a problem that might have caused a deadlock, which + // during the apply phase is always a bug in OpenTofu because + // we should've detected any user-caused problems during the + // planning phase. + diags = diags.Append(diagsForWorkgraphError(ctx, err, opResolvers)) + return nil, false, diags + } + return resultRaw.Value, resultRaw.CanContinue, resultRaw.Diagnostics + } + case waiterResultRef: + // In this case we'll precompile the results we're waiting for because + // then we can catch certain graph consistency problems sooner. + waitForRefs := c.sourceGraph.waiters[ref.index] + waiters := make([]nodeExecuteRaw, len(waitForRefs)) + for i, waitForRef := range waitForRefs { + waiters[i] = c.compileResultRef(waitForRef) + } + return func(ctx context.Context) (any, bool, tfdiags.Diagnostics) { + var diags tfdiags.Diagnostics + callerCanContinue := true + for _, waiter := range waiters { + _, ok, moreDiags := waiter(ctx) + diags = diags.Append(moreDiags) + if !ok { + // We'll remember that the caller is supposed to stop + // but we'll continue through our set of waiters in case + // we find any other diagnostics to propagate. + callerCanContinue = false + } + } + return struct{}{}, callerCanContinue, diags + } + default: + c.diags = append(c.diags, tfdiags.Sourceless( + tfdiags.Error, + errSummary, + fmt.Sprintf("The execution graph includes %#v, but the compiler doesn't know how to handle it. This is a bug in OpenTofu.", ref), + )) + return nil + } +} + +// nodeExecuteRaw is the lowest-level representation of producing a result, +// without any static type information yet. +// +// If the returned diagnostics includes errors then the caller must not try +// to type-assert the first result, and should instead just return the +// diagnostics along with its own nil result. +type nodeExecuteRaw = func(ctx context.Context) (any, bool, tfdiags.Diagnostics) + +// nodeExecuteRawOnce returns a [nodeExecuteRaw] that will call the given +// [nodeExecuteRaw] only once on first call and then return its result to all +// future callers. +// +// Each call to this function is independent even if two calls wrap the same +// [nodeExecuteRaw]. Callers should probably stash their result somewhere to +// reuse it for other callers that ought to share the result. +func nodeExecuteRawOnce(inner nodeExecuteRaw) nodeExecuteRaw { + // This mutex only for avoiding races to _start_ the request. It must not + // be used to await the result because we want to use the workgraph + // machinery to detect failures to resolve if e.g. the wrapped function + // panics. + var mu sync.Mutex + var reqID workgraph.RequestID + var promise workgraph.Promise[nodeResultRaw] + return func(ctx context.Context) (any, bool, tfdiags.Diagnostics) { + worker := grapheval.WorkerFromContext(ctx) + + mu.Lock() // We hold this only while ensuring there's an active request + if reqID == workgraph.NoRequest { + // This is the first request, so we'll actually run the function + // but first we'll set up the workgraph request so that subsequent + // callers can wait for it. + var resolver workgraph.Resolver[nodeResultRaw] + resolver, promise = workgraph.NewRequest[nodeResultRaw](worker) + mu.Unlock() // Allow concurrent callers to begin awaiting the promise + + ret, ok, diags := inner(ctx) + resolver.ReportSuccess(worker, nodeResultRaw{ + Value: ret, + CanContinue: ok, + Diagnostics: diags, + }) + return ret, ok, diags + } + mu.Unlock() + + result, err := promise.Await(worker) + diags := result.Diagnostics + if err != nil { + diags = diags.Append(diagsForWorkgraphError(ctx, err, nil)) + result.CanContinue = false + } + return result.Value, result.CanContinue, diags + } +} + +// nodeExecute is the type of a function that blocks until the result of a node +// is available and then returns that result. +// +// The boolean result is true if the caller is allowed to take any action based +// on the result. If it is false then the callers should ignore the T result +// and immediately return, on the assumption that something upstream has failed +// and will have already returned some diagnostics. +type nodeExecute[T any] func(ctx context.Context) (T, bool, tfdiags.Diagnostics) + +type nodeResultRaw struct { + Value any + CanContinue bool + Diagnostics tfdiags.Diagnostics +} + +func diagsForWorkgraphError(ctx context.Context, err error, operationResolvers []workgraph.Resolver[nodeResultRaw]) tfdiags.Diagnostics { + // findRequestName makes a best effort to describe the given workgraph request + // in terms of operations in the execution graph, though because all of + // these are "should never happen" cases this focuses mainly on providing + // information to help OpenTofu developers with debugging, rather than + // end-user-friendly information. (Any user-caused problems ought to have + // been detected during the planning phase, so any problem we encounter + // during apply is always an OpenTofu bug.) + // + // As usual we tolerate this being a pretty inefficient linear search + // over all of the requests we know about because we should only end up + // here when something has gone very wrong, and this approach avoids + // tracking a bunch of extra debug state in the happy path. + findRequestName := func(reqId workgraph.RequestID) string { + for opIdx, resolver := range operationResolvers { + if resolver.RequestID() == reqId { + return fmt.Sprintf("execution graph operation r[%d]", opIdx) + } + } + // If we fall out here then we presumably have a request ID from some + // other part of the system, such as from package configgraph. We + // might be able to get a useful description from a request tracker + // attached to the given context, if so. + // Note that we shouldn't really get here if the execution graph was + // constructed correctly because the "waiter" nodes used by anything + // that refers to the evaluator's oracle should block us from trying + // to retrieve something that isn't ready yet, but we'll attempt this + // anyway because if we get here then there's a bug somewhere by + // definition. + if reqTracker := grapheval.RequestTrackerFromContext(ctx); reqTracker != nil { + for candidate, info := range reqTracker.ActiveRequests() { + if candidate == reqId { + return info.Name + } + } + } + // If all of that failed then we'll just return a useless placeholder + // and hope that something else in the error message or debug log + // gives some clue as to what's going on. + return "" + } + + var diags tfdiags.Diagnostics + const summary = "Apply-time execution error" + switch err := err.(type) { + case workgraph.ErrSelfDependency: + var buf strings.Builder + buf.WriteString("While performing actions during the apply phase, OpenTofu detected a self-dependency cycle between the following:\n") + for _, reqId := range err.RequestIDs { + fmt.Fprintf(&buf, " - %s\n", findRequestName(reqId)) + } + buf.WriteString("\nThis is a bug in OpenTofu.") + diags = diags.Append(tfdiags.Sourceless( + tfdiags.Error, + summary, + buf.String(), + )) + case workgraph.ErrUnresolved: + diags = diags.Append(tfdiags.Sourceless( + tfdiags.Error, + summary, + fmt.Sprintf("While performing actions during the apply phase, a request for %q was left unresolved. This is a bug in OpenTofu.", findRequestName(err.RequestID)), + )) + default: + // We're not expecting any other error types here so we'll just + // return something generic. + diags = diags.Append(tfdiags.Sourceless( + tfdiags.Error, + summary, + fmt.Sprintf("While performing actions during the apply phase, OpenTofu encountered an unexpected error: %s.\n\nThis is a bug in OpenTofu.", err), + )) + } + return diags } diff --git a/internal/engine/internal/execgraph/compiler_operands.go b/internal/engine/internal/execgraph/compiler_operands.go new file mode 100644 index 0000000000..da8dd5009b --- /dev/null +++ b/internal/engine/internal/execgraph/compiler_operands.go @@ -0,0 +1,147 @@ +// Copyright (c) The OpenTofu Authors +// SPDX-License-Identifier: MPL-2.0 +// Copyright (c) 2023 HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package execgraph + +import ( + "context" + "fmt" + "iter" + "strings" + + "github.com/opentofu/opentofu/internal/tfdiags" +) + +// compilerOperands is a helper for concisely unpacking the operands of an +// operation while asserting the result types they are expected to produce. +// +// Users of this should call [nextOperand] for each expected operand in turn, +// and then call [compilerOperands.Finish] to collect error diagnostics for +// any problems that were detected and to ensure that the internal state is +// cleaned up correctly. If the Finish method returns error diagnostics then +// none of the results from [nextOperand] should be used. +// +// // assuming that "operands" is a pointer to a compilerOperands object +// getProviderAddr := nextOperand[addrs.Provider](operands) +// getProviderConfig := nextOperand[cty.Value](operands) +// waitForDependencies := operands.OperandWaiter() +// diags := operands.Finish() +// if diags.HasErrors() { +// // compilation fails +// } +type compilerOperands struct { + nextOperand func() (AnyResultRef, nodeExecuteRaw, bool) + stop func() + idx int + problems []string +} + +// newCompilerOperands prepares a new [compilerOperands] object that produces +// results based on the given sequence of operands, which was presumably +// returned by [compiler.compileOperands]. +// +// Refer to the documentation of [compilerOperands] for an example of how to +// use the result. +func newCompilerOperands(operands iter.Seq2[AnyResultRef, nodeExecuteRaw]) *compilerOperands { + next, stop := iter.Pull2(operands) + return &compilerOperands{ + nextOperand: next, + stop: stop, + idx: 0, + problems: nil, + } +} + +func nextOperand[T any](operands *compilerOperands) nodeExecute[T] { + idx := operands.idx + operands.idx++ + resultRef, execRaw, ok := operands.nextOperand() + if !ok { + operands.problems = append(operands.problems, fmt.Sprintf("missing expected operand %d", idx)) + return nil + } + // We'll catch type mismatches during compile time as long as the compiler + // produces correct nodeExecuteRaw implementations that actually honor + // the expected type. + if _, typeOk := resultRef.(ResultRef[T]); !typeOk { + var zero T + operands.problems = append(operands.problems, fmt.Sprintf("operand %d not of expected type %T (got %T)", idx, any(zero), resultRef)) + return nil + } + + return func(ctx context.Context) (T, bool, tfdiags.Diagnostics) { + var diags tfdiags.Diagnostics + // We intentionally don't propagate diagnostics here because they + // describe problems that the node associated with this operand will + // report directly when visited by [CompiledGraph.Execute]. We only + // want to return diagnostics that are unique to this particular + // reference to the node, such as the type mismatch error below. + resultRaw, ok, _ := execRaw(ctx) + if !ok { + var zero T + return zero, false, nil + } + result, ok := resultRaw.(T) + if !ok { + // We'll get here if the execRaw function was compiled incorrectly + // so that its actual result does not agree with the type of the + // ResultRef it was expected to satisfy. + diags = diags.Append(tfdiags.Sourceless( + tfdiags.Error, + "Invalid execution graph compilation", + fmt.Sprintf("Operand %d was supposed to be %T, but its implementation produced %T. This is a bug in OpenTofu.", idx, result, resultRaw), + )) + var zero T + return zero, false, diags + } + return result, true, diags + } +} + +// OperandWaiter is a variant of [nextOperand] for operands that don't produce +// a useful value and exist only to block beginning some other work until +// they have completed. +// +// If the returned function produces false then the caller must immediately +// return without doing any other work, because some upstream has failed and +// so we need to unwind and report the collected errors. +func (ops *compilerOperands) OperandWaiter() func(ctx context.Context) bool { + idx := ops.idx + ops.idx++ + _, execRaw, ok := ops.nextOperand() + if !ok { + ops.problems = append(ops.problems, fmt.Sprintf("missing expected operand %d", idx)) + return nil + } + return func(ctx context.Context) bool { + _, canContinue, _ := execRaw(ctx) + return canContinue + } +} + +func (ops *compilerOperands) Finish() tfdiags.Diagnostics { + // Regardless of how this terminates we no longer need the operand iterator. + defer ops.stop() + + var diags tfdiags.Diagnostics + problems := ops.problems + if _, _, anotherOperand := ops.nextOperand(); anotherOperand { + problems = append(problems, fmt.Sprintf("expected only %d operands, but found additional operands", ops.idx)) + } + if len(problems) != 0 { + var buf strings.Builder + buf.WriteString("Found incorrect operands when compiling operation:\n") + for _, problem := range problems { + fmt.Fprintf(&buf, " - %s\n", problem) + } + buf.WriteString("\nThis is a bug in OpenTofu.") + diags = diags.Append(tfdiags.Sourceless( + tfdiags.Error, + "Invalid operands for execution graph operation", + buf.String(), + )) + } + return diags +} diff --git a/internal/engine/internal/execgraph/compiler_ops.go b/internal/engine/internal/execgraph/compiler_ops.go new file mode 100644 index 0000000000..7e39b08841 --- /dev/null +++ b/internal/engine/internal/execgraph/compiler_ops.go @@ -0,0 +1,52 @@ +// Copyright (c) The OpenTofu Authors +// SPDX-License-Identifier: MPL-2.0 +// Copyright (c) 2023 HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package execgraph + +import ( + "context" + + "github.com/zclconf/go-cty/cty" + + "github.com/opentofu/opentofu/internal/addrs" + "github.com/opentofu/opentofu/internal/tfdiags" +) + +func (c *compiler) compileOpOpenProvider(operands *compilerOperands) nodeExecuteRaw { + getProviderAddr := nextOperand[addrs.Provider](operands) + getConfigVal := nextOperand[cty.Value](operands) + waitForDeps := operands.OperandWaiter() + diags := operands.Finish() + c.diags = c.diags.Append(diags) + if diags.HasErrors() { + return nil + } + + providers := c.evalCtx.Providers + + return func(ctx context.Context) (any, bool, tfdiags.Diagnostics) { + var diags tfdiags.Diagnostics + if !waitForDeps(ctx) { + return nil, false, diags + } + providerAddr, ok, moreDiags := getProviderAddr(ctx) + diags = diags.Append(moreDiags) + if !ok { + return nil, false, diags + } + configVal, ok, moreDiags := getConfigVal(ctx) + diags = diags.Append(moreDiags) + if !ok { + return nil, false, diags + } + + ret, moreDiags := providers.NewConfiguredProvider(ctx, providerAddr, configVal) + diags = diags.Append(moreDiags) + if moreDiags.HasErrors() { + return nil, false, diags + } + return ret, true, diags + } +} diff --git a/internal/engine/internal/execgraph/compiler_test.go b/internal/engine/internal/execgraph/compiler_test.go new file mode 100644 index 0000000000..54050e69c5 --- /dev/null +++ b/internal/engine/internal/execgraph/compiler_test.go @@ -0,0 +1,6 @@ +// Copyright (c) The OpenTofu Authors +// SPDX-License-Identifier: MPL-2.0 +// Copyright (c) 2023 HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package execgraph diff --git a/internal/lang/eval/config_apply.go b/internal/lang/eval/config_apply.go index 27e03302ae..a62c8cbe61 100644 --- a/internal/lang/eval/config_apply.go +++ b/internal/lang/eval/config_apply.go @@ -88,3 +88,31 @@ func (c *ConfigInstance) DriveApplying(ctx context.Context, glue ApplyGlue, run // the oracle only once it has already been made available by earlier work. type ApplyOracle struct { } + +// DesiredResourceInstance returns the [DesiredResourceInstance] object +// associated with the given resource instance address, or nil if the given +// address does not match a desired resource instance. +// +// This API assumes that the apply phase is working from an execution graph +// built during the planning phase and is therefore relying on the plan phase +// to correctly describe a subset of the desired resource instances so that +// this should never return nil. If this _does_ return nil then that suggests +// a bug in the planning engine, which caused it to create an incorrect +// execution graph. +func (o *ApplyOracle) DesiredResourceInstance(ctx context.Context, addr addrs.AbsResourceInstance) *DesiredResourceInstance { + // TODO: Implement + panic("unimplemented") +} + +// ProviderInstanceConfig returns the configuration value for the given +// provider instance, or [cty.NilVal] if there is no such provider instance. +// +// This API assumes that the apply phase is working from an execution graph +// built during the planning phase and is therefore relyingo n the plan phase +// to refer only to provider instances that are present ni the configuration. +// If this _does_ return cty.NilVal then that suggests a bug in the planning +// engine, causing it to create an incorrect execution graph. +func (o *ApplyOracle) ProviderInstanceConfig(ctx context.Context, addr addrs.AbsProviderInstanceCorrect) cty.Value { + // TODO: Implement + panic("unimplemented") +} diff --git a/internal/lang/grapheval/context.go b/internal/lang/grapheval/context.go index a1df6edb39..22f95576cc 100644 --- a/internal/lang/grapheval/context.go +++ b/internal/lang/grapheval/context.go @@ -53,12 +53,9 @@ func ContextWithRequestTracker(parent context.Context, tracker RequestTracker) c return context.WithValue(parent, trackerContextKey, tracker) } -// requestTrackerFromContext returns the request tracker associated with the +// RequestTrackerFromContext returns the request tracker associated with the // given context, or nil if there is no request tracker. -// -// This is unexported because request trackers are provided by external code -// but only used by code within this package. -func requestTrackerFromContext(ctx context.Context) RequestTracker { +func RequestTrackerFromContext(ctx context.Context) RequestTracker { tracker, ok := ctx.Value(trackerContextKey).(RequestTracker) if !ok { return nil diff --git a/internal/lang/grapheval/diagnostics.go b/internal/lang/grapheval/diagnostics.go index d32d6acda9..bc406774e1 100644 --- a/internal/lang/grapheval/diagnostics.go +++ b/internal/lang/grapheval/diagnostics.go @@ -30,7 +30,7 @@ import ( // information and will report that missing information as being a bug in // OpenTofu, because we should always be tracking requests correctly. func DiagnosticsForWorkgraphError(ctx context.Context, err error) tfdiags.Diagnostics { - tracker := requestTrackerFromContext(ctx) + tracker := RequestTrackerFromContext(ctx) if tracker == nil { // In this case we must return lower-quality error messages because