Parallelize provider installation (#2729)

Signed-off-by: Christian Mesh <christianmesh1@gmail.com>
This commit is contained in:
Christian Mesh 2025-12-01 11:55:53 -05:00 committed by GitHub
parent 607d74c882
commit 5e7397b8a3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 306 additions and 91 deletions

View file

@ -11,6 +11,7 @@ ENHANCEMENTS:
- `prevent_destroy` arguments in the `lifecycle` block for managed resources can now use references to other symbols in the same module, such as to a module's input variables. ([#3474](https://github.com/opentofu/opentofu/issues/3474), [#3507](https://github.com/opentofu/opentofu/issues/3507))
- OpenTofu now uses the `BROWSER` environment variable when launching a web browser on Unix platforms, as long as it's set to a single command that can accept a URL to open as its first and only argument. ([#3456](https://github.com/opentofu/opentofu/issues/3456))
- Improve performance around provider checking and schema management. ([#2730](https://github.com/opentofu/opentofu/pull/2730))
- `tofu init` now fetches providers and their metadata in parallel. Depending on provider size and network properties, this can reduce provider installation and checking time. ([#2729](https://github.com/opentofu/opentofu/pull/2729))
BUG FIXES:

View file

@ -960,6 +960,8 @@ func (c *InitCommand) getProviders(ctx context.Context, config *configs.Config,
}
},
}
// Ensure that events emitted on multiple routines do not trigger race conditions
evts = evts.Sync()
ctx = evts.OnContext(ctx)
mode := providercache.InstallNewProvidersOnly

View file

@ -268,6 +268,8 @@ func (c *ProvidersLockCommand) Run(args []string) int {
c.Ui.Output(fmt.Sprintf("- Retrieved %s %s for %s (%s%s)", provider.ForDisplay(), version, platform, auth, keyID))
},
}
// Ensure that events emitted on multiple routines do not trigger race conditions
evts = evts.Sync()
ctx := evts.OnContext(ctx)
dir := providercache.NewDirWithPlatform(tempDir, platform)

View file

@ -8,6 +8,7 @@ package getproviders
import (
"context"
"fmt"
"sync"
"github.com/apparentlymart/go-versions/versions"
"github.com/opentofu/opentofu/internal/addrs"
@ -22,6 +23,8 @@ type FilesystemMirrorSource struct {
// packages on the first call that needs package availability information,
// to avoid re-scanning the filesystem on subsequent operations.
allPackages map[addrs.Provider]PackageMetaList
lock sync.Mutex
}
var _ Source = (*FilesystemMirrorSource)(nil)
@ -117,6 +120,9 @@ func (s *FilesystemMirrorSource) AllAvailablePackages() (map[addrs.Provider]Pack
}
func (s *FilesystemMirrorSource) scanAllVersions() error {
s.lock.Lock()
defer s.lock.Unlock()
if s.allPackages != nil {
// we're distinguishing nil-ness from emptiness here so we can
// recognize when we've scanned the directory without errors, even

View file

@ -26,11 +26,12 @@ import (
type MemoizeSource struct {
underlying Source
mu sync.Mutex
availableVersions map[addrs.Provider]memoizeAvailableVersionsRet
packageMetas map[memoizePackageMetaCall]memoizePackageMetaRet
availableVersions map[addrs.Provider]*memoizeAvailableVersionsRet
packageMetas map[memoizePackageMetaCall]*memoizePackageMetaRet
}
type memoizeAvailableVersionsRet struct {
sync.Mutex
VersionList VersionList
Warnings Warnings
Err error
@ -43,6 +44,7 @@ type memoizePackageMetaCall struct {
}
type memoizePackageMetaRet struct {
sync.Mutex
PackageMeta PackageMeta
Err error
}
@ -54,8 +56,8 @@ var _ Source = (*MemoizeSource)(nil)
func NewMemoizeSource(underlying Source) *MemoizeSource {
return &MemoizeSource{
underlying: underlying,
availableVersions: make(map[addrs.Provider]memoizeAvailableVersionsRet),
packageMetas: make(map[memoizePackageMetaCall]memoizePackageMetaRet),
availableVersions: make(map[addrs.Provider]*memoizeAvailableVersionsRet),
packageMetas: make(map[memoizePackageMetaCall]*memoizePackageMetaRet),
}
}
@ -63,44 +65,76 @@ func NewMemoizeSource(underlying Source) *MemoizeSource {
// and caches them before returning them, or on subsequent calls returns the
// result directly from the cache.
func (s *MemoizeSource) AvailableVersions(ctx context.Context, provider addrs.Provider) (VersionList, Warnings, error) {
shouldComputeAvailableVersions := false
s.mu.Lock()
defer s.mu.Unlock()
if existing, exists := s.availableVersions[provider]; exists {
return existing.VersionList, nil, existing.Err
entry, exists := s.availableVersions[provider]
if !exists {
// Add entry to the map
entry = &memoizeAvailableVersionsRet{}
s.availableVersions[provider] = entry
// We are now responsible for computing the entry
shouldComputeAvailableVersions = true
// Take the lock early to prevent anyone else from holding it
entry.Lock()
defer entry.Unlock()
}
ret, warnings, err := s.underlying.AvailableVersions(ctx, provider)
s.availableVersions[provider] = memoizeAvailableVersionsRet{
VersionList: ret,
Err: err,
Warnings: warnings,
s.mu.Unlock()
if shouldComputeAvailableVersions {
// Compute result, we already have the lock from above
entry.VersionList, entry.Warnings, entry.Err = s.underlying.AvailableVersions(ctx, provider)
} else {
// Wait for the result to be available
entry.Lock()
defer entry.Unlock()
}
return ret, warnings, err
return entry.VersionList, entry.Warnings, entry.Err
}
// PackageMeta requests package metadata from the underlying source and caches
// the result before returning it, or on subsequent calls returns the result
// directly from the cache.
func (s *MemoizeSource) PackageMeta(ctx context.Context, provider addrs.Provider, version Version, target Platform) (PackageMeta, error) {
s.mu.Lock()
defer s.mu.Unlock()
key := memoizePackageMetaCall{
Provider: provider,
Version: version,
Target: target,
}
if existing, exists := s.packageMetas[key]; exists {
return existing.PackageMeta, existing.Err
shouldComputePackageMeta := false
s.mu.Lock()
entry, exists := s.packageMetas[key]
if !exists {
// Add entry to the map
entry = &memoizePackageMetaRet{}
s.packageMetas[key] = entry
// We are now responsible for computing the entry
shouldComputePackageMeta = true
// Take the lock early to prevent anyone else from holding it
entry.Lock()
defer entry.Unlock()
}
ret, err := s.underlying.PackageMeta(ctx, provider, version, target)
s.packageMetas[key] = memoizePackageMetaRet{
PackageMeta: ret,
Err: err,
s.mu.Unlock()
if shouldComputePackageMeta {
// Compute result, we already have the lock from above
entry.PackageMeta, entry.Err = s.underlying.PackageMeta(ctx, provider, version, target)
} else {
// Wait for the result to be available
entry.Lock()
defer entry.Unlock()
}
return ret, err
return entry.PackageMeta, entry.Err
}
func (s *MemoizeSource) ForDisplay(provider addrs.Provider) string {

View file

@ -7,6 +7,7 @@ import (
"fmt"
"io"
"os"
"sync"
"github.com/opentofu/opentofu/internal/addrs"
)
@ -22,6 +23,7 @@ type MockSource struct {
packages []PackageMeta
warnings map[addrs.Provider]Warnings
calls [][]interface{}
lock sync.Mutex
}
var _ Source = (*MockSource)(nil)
@ -43,6 +45,9 @@ func NewMockSource(packages []PackageMeta, warns map[addrs.Provider]Warnings) *M
// are available in the fixed set of packages that were passed to
// NewMockSource when creating the receiving source.
func (s *MockSource) AvailableVersions(ctx context.Context, provider addrs.Provider) (VersionList, Warnings, error) {
s.lock.Lock()
defer s.lock.Unlock()
s.calls = append(s.calls, []interface{}{"AvailableVersions", provider})
var ret VersionList
for _, pkg := range s.packages {
@ -79,6 +84,9 @@ func (s *MockSource) AvailableVersions(ctx context.Context, provider addrs.Provi
// of other sources in an equivalent situation because it's a degenerate case
// with undefined results.
func (s *MockSource) PackageMeta(ctx context.Context, provider addrs.Provider, version Version, target Platform) (PackageMeta, error) {
s.lock.Lock()
defer s.lock.Unlock()
s.calls = append(s.calls, []interface{}{"PackageMeta", provider, version, target})
for _, pkg := range s.packages {

View file

@ -12,6 +12,7 @@ import (
"slices"
"sort"
"strings"
"sync"
"github.com/apparentlymart/go-versions/versions"
@ -365,36 +366,33 @@ func (i *Installer) ensureProviderVersionsNeed(
errs map[addrs.Provider]error,
) (map[addrs.Provider]getproviders.Version, error) {
evts := installerEventsForContext(ctx)
need := map[addrs.Provider]getproviders.Version{}
NeedProvider:
for provider, acceptableVersions := range mightNeed {
if err := ctx.Err(); err != nil {
// If our context has been cancelled or reached a timeout then
// we'll abort early, because subsequent operations against
// that context will fail immediately anyway.
return nil, err
}
if err := ctx.Err(); err != nil {
// If our context has been cancelled or reached a timeout then
// we'll abort early, because subsequent operations against
// that context will fail immediately anyway.
return nil, err
}
computeNeeds := func(provider addrs.Provider, acceptableVersions getproviders.VersionSet) (getproviders.Version, error) {
if cb := evts.QueryPackagesBegin; cb != nil {
cb(provider, reqs[provider], locked[provider])
}
// Version 0.0.0 not supported
if err := checkUnspecifiedVersion(acceptableVersions); err != nil {
errs[provider] = err
if cb := evts.QueryPackagesFailure; cb != nil {
cb(provider, err)
}
continue
return getproviders.Version{}, err
}
available, warnings, err := i.source.AvailableVersions(ctx, provider)
if err != nil {
errs[provider] = err
if cb := evts.QueryPackagesFailure; cb != nil {
cb(provider, err)
}
// We will take no further actions for this provider.
continue
return getproviders.Version{}, err
}
if len(warnings) > 0 {
if cb := evts.QueryPackagesWarning; cb != nil {
@ -404,11 +402,10 @@ NeedProvider:
available.Sort() // put the versions in increasing order of precedence
for i := len(available) - 1; i >= 0; i-- { // walk backwards to consider newer versions first
if acceptableVersions.Has(available[i]) {
need[provider] = available[i]
if cb := evts.QueryPackagesSuccess; cb != nil {
cb(provider, available[i])
}
continue NeedProvider
return available[i], nil
}
}
// If we get here then the source has no packages that meet the given
@ -424,12 +421,34 @@ NeedProvider:
log.Printf("[DEBUG] %s", err.Error())
log.Printf("[DEBUG] Available releases: %s", available)
}
errs[provider] = err
if cb := evts.QueryPackagesFailure; cb != nil {
cb(provider, err)
}
return getproviders.Version{}, err
}
need := map[addrs.Provider]getproviders.Version{}
var updateLock sync.Mutex
var wg sync.WaitGroup
for provider, acceptableVersions := range mightNeed {
wg.Go(func() {
// Heavy lifting
version, err := computeNeeds(provider, acceptableVersions)
// Update results
updateLock.Lock()
defer updateLock.Unlock()
if err != nil {
errs[provider] = err
} else {
need[provider] = version
}
})
}
wg.Wait()
return need, nil
}
@ -442,50 +461,64 @@ func (i *Installer) ensureProviderVersionsInstall(
targetPlatform getproviders.Platform,
errs map[addrs.Provider]error,
) (map[addrs.Provider]*getproviders.PackageAuthenticationResult, error) {
if err := ctx.Err(); err != nil {
// If our context has been cancelled or reached a timeout then
// we'll abort early, because subsequent operations against
// that context will fail immediately anyway.
return nil, err
}
authResults := map[addrs.Provider]*getproviders.PackageAuthenticationResult{} // record auth results for all successfully fetched providers
var updateLock sync.Mutex
var wg sync.WaitGroup
for provider, version := range need {
traceCtx, span := tracing.Tracer().Start(ctx,
fmt.Sprintf("Install Provider %q", provider.String()),
tracing.SpanAttributes(
traceattrs.OpenTofuProviderAddress(provider.String()),
traceattrs.OpenTofuProviderVersion(version.String()),
traceattrs.OpenTofuTargetPlatform(targetPlatform.String()),
),
)
wg.Go(func() {
traceCtx, span := tracing.Tracer().Start(ctx,
fmt.Sprintf("Install Provider %q", provider.String()),
tracing.SpanAttributes(
traceattrs.OpenTofuProviderAddress(provider.String()),
traceattrs.OpenTofuProviderVersion(version.String()),
traceattrs.OpenTofuTargetPlatform(targetPlatform.String()),
),
)
defer span.End()
if err := traceCtx.Err(); err != nil {
// If our context has been cancelled or reached a timeout then
// we'll abort early, because subsequent operations against
// that context will fail immediately anyway.
tracing.SetSpanError(span, err)
span.End()
return nil, err
}
// Heavy lifting
authResult, newHashes, err := i.ensureProviderVersionInstalled(traceCtx, locks.Provider(provider), mode, provider, version, targetPlatform)
authResult, err := i.ensureProviderVersionInstalled(traceCtx, locks, reqs, mode, provider, version, targetPlatform)
if authResult != nil {
authResults[provider] = authResult
}
if err != nil {
errs[provider] = err
}
span.End()
// Update results
updateLock.Lock()
defer updateLock.Unlock()
if err != nil {
tracing.SetSpanError(span, err)
errs[provider] = err
}
if authResult != nil {
authResults[provider] = authResult
}
if len(newHashes) > 0 {
locks.SetProvider(provider, version, reqs[provider], newHashes)
}
})
}
wg.Wait()
return authResults, nil
}
func (i *Installer) ensureProviderVersionInstalled(
ctx context.Context,
locks *depsfile.Locks,
reqs getproviders.Requirements,
lock *depsfile.ProviderLock,
mode InstallMode,
provider addrs.Provider,
version getproviders.Version,
targetPlatform getproviders.Platform,
) (*getproviders.PackageAuthenticationResult, error) {
) (*getproviders.PackageAuthenticationResult, []getproviders.Hash, error) {
evts := installerEventsForContext(ctx)
lock := locks.Provider(provider)
var preferredHashes []getproviders.Hash
if lock != nil && lock.Version() == version { // hash changes are expected if the version is also changing
@ -499,11 +532,8 @@ func (i *Installer) ensureProviderVersionInstalled(
if cb := evts.ProviderAlreadyInstalled; cb != nil {
cb(provider, version, false)
}
// Even though the package is installed, the requirements in the lockfile may still need to be updated
locks.SetProvider(provider, version, reqs[provider], lock.AllHashes())
return nil, nil
return nil, lock.AllHashes(), nil
}
}
}
@ -517,10 +547,10 @@ func (i *Installer) ensureProviderVersionInstalled(
linkTo = nil // no linking needed
}
result, err := i.ensureProviderVersionInDirectory(ctx, locks, reqs, mode, provider, version, targetPlatform, installTo)
result, newHashes, err := i.ensureProviderVersionInDirectory(ctx, lock, mode, provider, version, targetPlatform, installTo)
if err != nil {
return result, err
return result, newHashes, err
}
if linkTo != nil {
@ -536,7 +566,7 @@ func (i *Installer) ensureProviderVersionInstalled(
if cb := evts.LinkFromCacheFailure; cb != nil {
cb(provider, version, err)
}
return nil, err
return nil, nil, err
}
// We should now also find the package in the linkTo dir, which
@ -549,14 +579,14 @@ func (i *Installer) ensureProviderVersionInstalled(
if cb := evts.LinkFromCacheFailure; cb != nil {
cb(provider, version, err)
}
return nil, err
return nil, nil, err
}
if _, err := new.ExecutableFile(); err != nil {
err := fmt.Errorf("provider binary not found: %w", err)
if cb := evts.LinkFromCacheFailure; cb != nil {
cb(provider, version, err)
}
return nil, err
return nil, nil, err
}
if cb := evts.LinkFromCacheSuccess; cb != nil {
@ -564,21 +594,19 @@ func (i *Installer) ensureProviderVersionInstalled(
}
}
return result, err
return result, newHashes, err
}
func (i *Installer) ensureProviderVersionInDirectory(
ctx context.Context,
locks *depsfile.Locks,
reqs getproviders.Requirements,
lock *depsfile.ProviderLock,
mode InstallMode,
provider addrs.Provider,
version getproviders.Version,
targetPlatform getproviders.Platform,
installTo *Dir,
) (*getproviders.PackageAuthenticationResult, error) {
) (*getproviders.PackageAuthenticationResult, []getproviders.Hash, error) {
evts := installerEventsForContext(ctx)
lock := locks.Provider(provider)
var preferredHashes []getproviders.Hash
if lock != nil && lock.Version() == version { // hash changes are expected if the version is also changing
@ -596,9 +624,7 @@ func (i *Installer) ensureProviderVersionInDirectory(
}
// Even though the package is installed, the requirements in the lockfile may still need to be updated
locks.SetProvider(provider, version, reqs[provider], lock.AllHashes())
return nil, nil
return nil, lock.AllHashes(), nil
}
}
}
@ -616,7 +642,7 @@ func (i *Installer) ensureProviderVersionInDirectory(
if cb := evts.FetchPackageFailure; cb != nil {
cb(provider, version, err)
}
return nil, err
return nil, nil, err
}
// Step 3c: Retrieve the package indicated by the metadata we received,
@ -639,7 +665,7 @@ func (i *Installer) ensureProviderVersionInDirectory(
if cb := evts.FetchPackageFailure; cb != nil {
cb(provider, version, err)
}
return nil, err
return nil, nil, err
}
new := installTo.ProviderVersion(provider, version)
@ -648,14 +674,14 @@ func (i *Installer) ensureProviderVersionInDirectory(
if cb := evts.FetchPackageFailure; cb != nil {
cb(provider, version, err)
}
return nil, err
return nil, nil, err
}
if _, err := new.ExecutableFile(); err != nil {
err := fmt.Errorf("provider binary not found: %w", err)
if cb := evts.FetchPackageFailure; cb != nil {
cb(provider, version, err)
}
return nil, err
return nil, nil, err
}
// The InstallPackage call above should've verified that
@ -688,7 +714,7 @@ func (i *Installer) ensureProviderVersionInDirectory(
if cb := evts.FetchPackageFailure; cb != nil {
cb(provider, version, err)
}
return authResult, err
return authResult, nil, err
}
// localHashes is the set of hashes that we were able to verify locally
@ -723,7 +749,6 @@ func (i *Installer) ensureProviderVersionInDirectory(
newHashes = append(newHashes, localHashes...)
newHashes = append(newHashes, signedHashes...)
locks.SetProvider(provider, version, reqs[provider], newHashes)
if cb := evts.ProvidersLockUpdated; cb != nil {
// priorHashes is already sorted, but we do need to sort
// the newly-generated localHashes and signedHashes.
@ -746,7 +771,7 @@ func (i *Installer) ensureProviderVersionInDirectory(
cb(provider, version, new.PackageDir, authResult)
}
return authResult, nil
return authResult, newHashes, nil
}
// checkUnspecifiedVersion Check the presence of version 0.0.0 and return an error with a tip

View file

@ -7,6 +7,7 @@ package providercache
import (
"context"
"sync"
"github.com/opentofu/opentofu/internal/addrs"
"github.com/opentofu/opentofu/internal/getproviders"
@ -151,6 +152,142 @@ func (e *InstallerEvents) OnContext(ctx context.Context) context.Context {
return context.WithValue(ctx, ctxInstallerEvents, e)
}
// Sync wraps the events in a single lock to allow consumers of the
// InstallerEvents concept to not have to worry about race conditions
// as these events may be emitted by multiple go-routines
func (e *InstallerEvents) Sync() *InstallerEvents {
lock := sync.Mutex{}
return &InstallerEvents{
PendingProviders: func(reqs map[addrs.Provider]getproviders.VersionConstraints) {
lock.Lock()
defer lock.Unlock()
if e.PendingProviders != nil {
e.PendingProviders(reqs)
}
},
ProviderAlreadyInstalled: func(provider addrs.Provider, selectedVersion getproviders.Version, inProviderCache bool) {
lock.Lock()
defer lock.Unlock()
if e.ProviderAlreadyInstalled != nil {
e.ProviderAlreadyInstalled(provider, selectedVersion, inProviderCache)
}
},
BuiltInProviderAvailable: func(provider addrs.Provider) {
lock.Lock()
defer lock.Unlock()
if e.BuiltInProviderAvailable != nil {
e.BuiltInProviderAvailable(provider)
}
},
BuiltInProviderFailure: func(provider addrs.Provider, err error) {
lock.Lock()
defer lock.Unlock()
if e.BuiltInProviderFailure != nil {
e.BuiltInProviderFailure(provider, err)
}
},
QueryPackagesBegin: func(provider addrs.Provider, versionConstraints getproviders.VersionConstraints, locked bool) {
lock.Lock()
defer lock.Unlock()
if e.QueryPackagesBegin != nil {
e.QueryPackagesBegin(provider, versionConstraints, locked)
}
},
QueryPackagesSuccess: func(provider addrs.Provider, selectedVersion getproviders.Version) {
lock.Lock()
defer lock.Unlock()
if e.QueryPackagesSuccess != nil {
e.QueryPackagesSuccess(provider, selectedVersion)
}
},
QueryPackagesFailure: func(provider addrs.Provider, err error) {
lock.Lock()
defer lock.Unlock()
if e.QueryPackagesFailure != nil {
e.QueryPackagesFailure(provider, err)
}
},
QueryPackagesWarning: func(provider addrs.Provider, warn []string) {
lock.Lock()
defer lock.Unlock()
if e.QueryPackagesWarning != nil {
e.QueryPackagesWarning(provider, warn)
}
},
LinkFromCacheBegin: func(provider addrs.Provider, version getproviders.Version, cacheRoot string) {
lock.Lock()
defer lock.Unlock()
if e.LinkFromCacheBegin != nil {
e.LinkFromCacheBegin(provider, version, cacheRoot)
}
},
LinkFromCacheSuccess: func(provider addrs.Provider, version getproviders.Version, localDir string) {
lock.Lock()
defer lock.Unlock()
if e.LinkFromCacheSuccess != nil {
e.LinkFromCacheSuccess(provider, version, localDir)
}
},
LinkFromCacheFailure: func(provider addrs.Provider, version getproviders.Version, err error) {
lock.Lock()
defer lock.Unlock()
if e.LinkFromCacheFailure != nil {
e.LinkFromCacheFailure(provider, version, err)
}
},
FetchPackageMeta: func(provider addrs.Provider, version getproviders.Version) {
lock.Lock()
defer lock.Unlock()
if e.FetchPackageMeta != nil {
e.FetchPackageMeta(provider, version)
}
},
FetchPackageBegin: func(provider addrs.Provider, version getproviders.Version, location getproviders.PackageLocation, inProviderCache bool) {
lock.Lock()
defer lock.Unlock()
if e.FetchPackageBegin != nil {
e.FetchPackageBegin(provider, version, location, inProviderCache)
}
},
FetchPackageSuccess: func(provider addrs.Provider, version getproviders.Version, localDir string, authResult *getproviders.PackageAuthenticationResult) {
lock.Lock()
defer lock.Unlock()
if e.FetchPackageSuccess != nil {
e.FetchPackageSuccess(provider, version, localDir, authResult)
}
},
FetchPackageFailure: func(provider addrs.Provider, version getproviders.Version, err error) {
lock.Lock()
defer lock.Unlock()
if e.FetchPackageFailure != nil {
e.FetchPackageFailure(provider, version, err)
}
},
CacheDirLockContended: func(cacheDir string) {
lock.Lock()
defer lock.Unlock()
if e.CacheDirLockContended != nil {
e.CacheDirLockContended(cacheDir)
}
},
ProvidersLockUpdated: func(provider addrs.Provider, version getproviders.Version, localHashes []getproviders.Hash, signedHashes []getproviders.Hash, priorHashes []getproviders.Hash) {
lock.Lock()
defer lock.Unlock()
if e.ProvidersLockUpdated != nil {
e.ProvidersLockUpdated(provider, version, localHashes, signedHashes, priorHashes)
}
},
ProvidersAuthenticated: func(authResults map[addrs.Provider]*getproviders.PackageAuthenticationResult) {
lock.Lock()
defer lock.Unlock()
if e.ProvidersAuthenticated != nil {
e.ProvidersAuthenticated(authResults)
}
},
}
}
// installerEventsForContext looks on the given context for a registered
// InstallerEvents and returns a pointer to it if so.
//