mirror of
https://github.com/opentofu/opentofu.git
synced 2026-05-28 04:15:54 -04:00
Provider plugin cache locking (#1878)
Signed-off-by: Christian Mesh <christianmesh1@gmail.com>
This commit is contained in:
parent
b58afa062c
commit
d0ee5a36a5
10 changed files with 246 additions and 32 deletions
|
|
@ -6,9 +6,11 @@
|
|||
package e2etest
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/opentofu/opentofu/internal/e2e"
|
||||
|
|
@ -90,3 +92,43 @@ func TestProviderProtocols(t *testing.T) {
|
|||
t.Fatalf("wrong destroy output\nstdout:%s\nstderr:%s", stdout, stderr)
|
||||
}
|
||||
}
|
||||
|
||||
// This test is designed to simulate a *very* busy CI server that has multiple
|
||||
// processes sharing a global provider cache. This exercises the locking in the
|
||||
// "providercache" package, as well as simulating bad file hashes in the
|
||||
// lock file.
|
||||
func TestProviderGlobalCache(t *testing.T) {
|
||||
if !canAccessNetwork() {
|
||||
t.Skip("Requires provider download access for e2e provider interactions")
|
||||
}
|
||||
|
||||
t.Parallel()
|
||||
|
||||
tmpDir, err := filepath.EvalSymlinks(t.TempDir())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
rcLoc := filepath.Join(tmpDir, ".tofurc")
|
||||
rcData := fmt.Sprintf(`plugin_cache_dir = "%s"`, tmpDir)
|
||||
err = os.WriteFile(rcLoc, []byte(rcData), 0600)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i := 0; i < 16; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
tf := e2e.NewBinary(t, tofuBin, "testdata/provider-global-cache")
|
||||
tf.AddEnv(fmt.Sprintf("TF_CLI_CONFIG_FILE=%s", rcLoc))
|
||||
|
||||
stdout, stderr, err := tf.Run("init")
|
||||
tofuResult{t, stdout, stderr, err}.Success()
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
|
|
|||
19
internal/command/e2etest/testdata/provider-global-cache/.terraform.lock.hcl
vendored
Normal file
19
internal/command/e2etest/testdata/provider-global-cache/.terraform.lock.hcl
vendored
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
# This is intentionally broken
|
||||
|
||||
provider "registry.opentofu.org/hashicorp/tfcoremock" {
|
||||
version = "0.1.1"
|
||||
constraints = "0.1.1"
|
||||
hashes = [
|
||||
"h1:4piQYSNK4ba+swomwcyLysPAzoglQI1L2252P9zfKdQ=",
|
||||
"zh:16ca93ba1d5c1e0b772034a61038a115a2f24653c350d708f99aee12bf5505bf",
|
||||
"zh:1d4c8c6607b08e754557b55fe2f1bff269d92ebc25d911395308db80ab78b0cf",
|
||||
"zh:2ac82fb3d3f330af2737effaa63c9947175567fe18f1e0d527c038e48e71a923",
|
||||
"zh:2ae3cd3ced534c72af4b1c99098c8c2d14e2c98cf8e3c290d224ee71c3d51e4f",
|
||||
"zh:39c6c01d59103bf7fd2936b02fc2405d70c2bf28778a2c6335d7e38d468cbca1",
|
||||
"zh:a4413dee6ff50862c5258ede5e425205b6d94413d13db1479efd28e63364ce3e",
|
||||
"zh:a47484f8faff6787baab6c5df685ce8f478ad59d02f3b603aaca58b4ba2aa8cb",
|
||||
"zh:bc1a555a13793a82c720d5d155472ecf72115954fdf221cd6578d3270906dc3e",
|
||||
"zh:d6a0f5c61e5fe0c7a70f991cbd193ede37394f75840064a15c20981ce00f955e",
|
||||
"zh:f0ece5af834bb1be30faa57cd9965096d70c3ed02177fa72510e3fe9f1f9047a",
|
||||
]
|
||||
}
|
||||
8
internal/command/e2etest/testdata/provider-global-cache/main.tf
vendored
Normal file
8
internal/command/e2etest/testdata/provider-global-cache/main.tf
vendored
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
terraform {
|
||||
required_providers {
|
||||
tfcoremock = {
|
||||
source = "tfcoremock"
|
||||
version = "0.1.1"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -6,18 +6,17 @@
|
|||
//go:build !windows
|
||||
// +build !windows
|
||||
|
||||
package statemgr
|
||||
package flock
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// use fcntl POSIX locks for the most consistent behavior across platforms, and
|
||||
// hopefully some compatibility over NFS and CIFS.
|
||||
func (s *Filesystem) lock() error {
|
||||
log.Printf("[TRACE] statemgr.Filesystem: locking %s using fcntl flock", s.path)
|
||||
func Lock(f *os.File) error {
|
||||
flock := &syscall.Flock_t{
|
||||
Type: syscall.F_RDLCK | syscall.F_WRLCK,
|
||||
Whence: int16(io.SeekStart),
|
||||
|
|
@ -25,12 +24,10 @@ func (s *Filesystem) lock() error {
|
|||
Len: 0,
|
||||
}
|
||||
|
||||
fd := s.stateFileOut.Fd()
|
||||
return syscall.FcntlFlock(fd, syscall.F_SETLK, flock)
|
||||
return syscall.FcntlFlock(f.Fd(), syscall.F_SETLK, flock)
|
||||
}
|
||||
|
||||
func (s *Filesystem) unlock() error {
|
||||
log.Printf("[TRACE] statemgr.Filesystem: unlocking %s using fcntl flock", s.path)
|
||||
func Unlock(f *os.File) error {
|
||||
flock := &syscall.Flock_t{
|
||||
Type: syscall.F_UNLCK,
|
||||
Whence: int16(io.SeekStart),
|
||||
|
|
@ -38,6 +35,5 @@ func (s *Filesystem) unlock() error {
|
|||
Len: 0,
|
||||
}
|
||||
|
||||
fd := s.stateFileOut.Fd()
|
||||
return syscall.FcntlFlock(fd, syscall.F_SETLK, flock)
|
||||
return syscall.FcntlFlock(f.Fd(), syscall.F_SETLK, flock)
|
||||
}
|
||||
|
|
@ -6,11 +6,11 @@
|
|||
//go:build windows
|
||||
// +build windows
|
||||
|
||||
package statemgr
|
||||
package flock
|
||||
|
||||
import (
|
||||
"log"
|
||||
"math"
|
||||
"os"
|
||||
"syscall"
|
||||
"unsafe"
|
||||
)
|
||||
|
|
@ -28,9 +28,8 @@ const (
|
|||
_LOCKFILE_EXCLUSIVE_LOCK = 2
|
||||
)
|
||||
|
||||
func (s *Filesystem) lock() error {
|
||||
log.Printf("[TRACE] statemgr.Filesystem: locking %s using LockFileEx", s.path)
|
||||
|
||||
// This still alows the file handle to be opened by another process for competing locks on the same file.
|
||||
func Lock(f *os.File) error {
|
||||
// even though we're failing immediately, an overlapped event structure is
|
||||
// required
|
||||
ol, err := newOverlapped()
|
||||
|
|
@ -40,7 +39,7 @@ func (s *Filesystem) lock() error {
|
|||
defer syscall.CloseHandle(ol.HEvent)
|
||||
|
||||
return lockFileEx(
|
||||
syscall.Handle(s.stateFileOut.Fd()),
|
||||
syscall.Handle(f.Fd()),
|
||||
_LOCKFILE_EXCLUSIVE_LOCK|_LOCKFILE_FAIL_IMMEDIATELY,
|
||||
0, // reserved
|
||||
0, // bytes low
|
||||
|
|
@ -49,10 +48,8 @@ func (s *Filesystem) lock() error {
|
|||
)
|
||||
}
|
||||
|
||||
func (s *Filesystem) unlock() error {
|
||||
log.Printf("[TRACE] statemgr.Filesystem: unlocked by closing %s", s.path)
|
||||
|
||||
// the file is closed in Unlock
|
||||
func Unlock(*os.File) error {
|
||||
// the lock is released when Close() is called
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -8,6 +8,8 @@ package getproviders
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
|
||||
"github.com/hashicorp/go-getter"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.30.0"
|
||||
|
|
@ -67,13 +69,26 @@ func (p PackageLocalArchive) InstallProviderPackage(ctx context.Context, meta Pa
|
|||
filename := meta.Location.String()
|
||||
span.SetAttributes(semconv.FilePath(filename))
|
||||
|
||||
// NOTE: We're not checking whether there's already a directory at
|
||||
// targetDir with some files in it. Packages are supposed to be immutable
|
||||
// and therefore we'll just be overwriting all of the existing files with
|
||||
// their same contents unless something unusual is happening. If something
|
||||
// unusual _is_ happening then this will produce something that doesn't
|
||||
// match the allowed hashes and so our caller should catch that after
|
||||
// we return if so.
|
||||
// NOTE: Packages are immutable, but we may want to skip overwriting the existing
|
||||
// files in due to specific scenarios defined below.
|
||||
|
||||
if _, err := os.Stat(targetDir); err == nil {
|
||||
// If the package might already be installed, we should try to skip overwriting the contents.
|
||||
// When run with TF_PLUGIN_CACHE_DIR or similar, a given provider might already be executing
|
||||
// and therefore locking the provider binary in the target directory (preventing the overwrite below)
|
||||
//
|
||||
// This does incur the overhead of two additional hash computations and could be
|
||||
// skipped with smarter checks around re-use scenarios in the future.
|
||||
|
||||
targetHash, targetErr := PackageHashV1(PackageLocalDir(targetDir))
|
||||
fileHash, fileErr := PackageHashV1(meta.Location)
|
||||
|
||||
if targetHash == fileHash && fileErr == nil && targetErr == nil {
|
||||
// Package is properly installed, bad or missing lock file will be caught elsewhere
|
||||
log.Printf("[INFO] Skipping local installation of provider %s %s as the existing contents already match the new contents", meta.Provider, meta.Version)
|
||||
return authResult, nil
|
||||
}
|
||||
}
|
||||
|
||||
//nolint:mnd // magic number predates us using this linter
|
||||
err := unzip.Decompress(targetDir, filename, true, 0000)
|
||||
|
|
|
|||
|
|
@ -6,11 +6,16 @@
|
|||
package providercache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/opentofu/opentofu/internal/addrs"
|
||||
"github.com/opentofu/opentofu/internal/flock"
|
||||
"github.com/opentofu/opentofu/internal/getproviders"
|
||||
)
|
||||
|
||||
|
|
@ -42,6 +47,12 @@ type Dir struct {
|
|||
// directory made by other codepaths because the contract for NewDir
|
||||
// explicitly defines using the same directory for multiple purposes
|
||||
// as undefined behavior.
|
||||
// However, this code is now used for the global provider cache. With
|
||||
// the added support for locking, the data may no longer be valid with
|
||||
// changes from other processes. In practice this means that some packages
|
||||
// may have been installed since the latest re-scan. The code that
|
||||
// handles the installation should be smart enough to detect that and
|
||||
// work around it.
|
||||
metaCache map[addrs.Provider][]CachedProvider
|
||||
}
|
||||
|
||||
|
|
@ -77,6 +88,95 @@ func (d *Dir) BasePath() string {
|
|||
return filepath.Clean(d.baseDir)
|
||||
}
|
||||
|
||||
func (d *Dir) Lock(ctx context.Context, provider addrs.Provider, version getproviders.Version) (func() error, error) {
|
||||
providerPath := getproviders.UnpackedDirectoryPathForPackage(d.baseDir, provider, version, d.targetPlatform)
|
||||
|
||||
// If the lockfile is put within the target directory, it can mess with hashing
|
||||
// Instead we add a suffix to the last part of the path (targetplatform) and lock that file instead.
|
||||
dirPath := filepath.Dir(providerPath)
|
||||
lockFileName := filepath.Base(providerPath) + ".lock"
|
||||
lockFile := filepath.Join(dirPath, lockFileName)
|
||||
|
||||
log.Printf("[TRACE] Attempting to acquire global provider lock %s", lockFile)
|
||||
|
||||
// Ensure the provider directory exists
|
||||
//nolint: mnd // directory permissions
|
||||
if err := os.MkdirAll(dirPath, 0755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var f *os.File
|
||||
|
||||
// Try to create the lock file, wait up to 1 second for transient errors to clear.
|
||||
for timeout := time.After(time.Second); ; {
|
||||
var err error
|
||||
|
||||
// Try to get a handle to the file (or create if it does not exist)
|
||||
// They will all end up with the same file handle on any correctly implemented filesystem.
|
||||
// This is one of the many reasons we recommend users look at the flock support of their
|
||||
// networked filesystems when using the global provider cache.
|
||||
// Windows: even though out flock creates an exclusive lock, we are still able to open a handle to this file and wait below for the actual lock to be provided.
|
||||
// Sometimes the creates can conflict and will need to be tried multiple times (incredibly uncommon).
|
||||
f, err = os.OpenFile(lockFile, os.O_RDWR|os.O_CREATE, 0644)
|
||||
if err == nil {
|
||||
// We don't defer f.Close() here as we explicitly want to handle it below
|
||||
break
|
||||
}
|
||||
|
||||
select {
|
||||
case <-timeout:
|
||||
return nil, err
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
// Chill for a bit before trying again
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for the file lock for up to 60s. Might make sense to have the timeout be configurable for different network conditions / package sizes.
|
||||
for timeout := time.After(time.Second * 60); ; {
|
||||
// We have a valid file handle, let's try to lock it (nonblocking)
|
||||
err := flock.Lock(f)
|
||||
if err == nil {
|
||||
// Lock succeeded
|
||||
break
|
||||
}
|
||||
|
||||
select {
|
||||
case <-timeout:
|
||||
if f != nil {
|
||||
f.Close()
|
||||
}
|
||||
return nil, fmt.Errorf("Unable to acquire file lock on %q: %w", lockFile, err)
|
||||
case <-ctx.Done():
|
||||
if f != nil {
|
||||
f.Close()
|
||||
}
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
// Chill for a bit before trying again
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
log.Printf("[TRACE] Acquired global provider lock %s", lockFile)
|
||||
|
||||
return func() error {
|
||||
log.Printf("[TRACE] Releasing global provider lock %s", lockFile)
|
||||
|
||||
unlockErr := flock.Unlock(f)
|
||||
|
||||
// Prefer close error over unlock error
|
||||
err := f.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return unlockErr
|
||||
}, nil
|
||||
}
|
||||
|
||||
// AllAvailablePackages returns a description of all of the packages already
|
||||
// present in the directory. The cache entries are grouped by the provider
|
||||
// they relate to and then sorted by version precedence, with highest
|
||||
|
|
|
|||
|
|
@ -483,6 +483,18 @@ func (i *Installer) ensureProviderVersionInstall(
|
|||
) (*getproviders.PackageAuthenticationResult, error) {
|
||||
evts := installerEventsForContext(ctx)
|
||||
|
||||
// The unlock function may be used if the globalCacheDir is set. It is unlocked in the *next* iteration of the loop or after the loop exits
|
||||
// We create a file lock per-provider to prevent modification from different processes using the shared global cache
|
||||
// We lock per-provider to prevent deadlocks and release as soon as possible (instead of defer at the function scope)
|
||||
// This is not ideal, but the best option for not generating a large code diff
|
||||
var unlock func()
|
||||
defer func() {
|
||||
if unlock != nil {
|
||||
// Free remaining lock on return
|
||||
unlock()
|
||||
}
|
||||
}()
|
||||
|
||||
lock := locks.Provider(provider)
|
||||
var preferredHashes []getproviders.Hash
|
||||
if lock != nil && lock.Version() == version { // hash changes are expected if the version is also changing
|
||||
|
|
@ -502,6 +514,21 @@ func (i *Installer) ensureProviderVersionInstall(
|
|||
}
|
||||
|
||||
if i.globalCacheDir != nil {
|
||||
// Try to lock the provider's directory.
|
||||
unlockProvider, err := i.globalCacheDir.Lock(ctx, provider, version)
|
||||
if err != nil {
|
||||
if cb := evts.LinkFromCacheFailure; cb != nil {
|
||||
cb(provider, version, err)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
unlock = func() {
|
||||
err = unlockProvider()
|
||||
if err != nil {
|
||||
log.Printf("[ERROR] Unable to clear provider lock: %s", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// If our global cache already has this version available then
|
||||
// we'll just link it in.
|
||||
installed, err := tryInstallPackageFromCacheDir(
|
||||
|
|
@ -567,6 +594,13 @@ func (i *Installer) ensureProviderVersionInstall(
|
|||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if unlock != nil {
|
||||
// Early unlock as the write to the globalCacheDir has completed
|
||||
unlock()
|
||||
unlock = nil
|
||||
}
|
||||
|
||||
new := installTo.ProviderVersion(provider, version)
|
||||
if new == nil {
|
||||
err := fmt.Errorf("after installing %s it is still not detected in %s; this is a bug in OpenTofu", provider, installTo.BasePath())
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import (
|
|||
multierror "github.com/hashicorp/go-multierror"
|
||||
|
||||
"github.com/opentofu/opentofu/internal/encryption"
|
||||
"github.com/opentofu/opentofu/internal/flock"
|
||||
"github.com/opentofu/opentofu/internal/states"
|
||||
"github.com/opentofu/opentofu/internal/states/statefile"
|
||||
"github.com/opentofu/opentofu/internal/tofu"
|
||||
|
|
@ -339,7 +340,8 @@ func (s *Filesystem) Lock(info *LockInfo) (string, error) {
|
|||
return "", fmt.Errorf("state %q already locked", s.stateFileOut.Name())
|
||||
}
|
||||
|
||||
if err := s.lock(); err != nil {
|
||||
log.Printf("[TRACE] statemgr.Filesystem: locking %s", s.path)
|
||||
if err := flock.Lock(s.stateFileOut); err != nil {
|
||||
info, infoErr := s.lockInfo()
|
||||
if infoErr != nil {
|
||||
err = multierror.Append(err, infoErr)
|
||||
|
|
@ -391,7 +393,8 @@ func (s *Filesystem) Unlock(id string) error {
|
|||
}
|
||||
fileName := s.stateFileOut.Name()
|
||||
|
||||
unlockErr := s.unlock()
|
||||
log.Printf("[TRACE] statemgr.Filesystem: unlocking %s", s.path)
|
||||
unlockErr := flock.Unlock(s.stateFileOut)
|
||||
|
||||
s.stateFileOut.Close()
|
||||
s.stateFileOut = nil
|
||||
|
|
|
|||
|
|
@ -375,9 +375,9 @@ been placed there. Over time, as plugins are upgraded, the cache directory may
|
|||
grow to contain several unused versions which you must delete manually.
|
||||
|
||||
:::note
|
||||
The plugin cache directory is not guaranteed to be concurrency
|
||||
safe. The provider installer's behavior in environments with multiple `tofu
|
||||
init` calls is undefined.
|
||||
The plugin cache directory makes a best effort to be concurrency
|
||||
safe. It uses standard file locking practices (fnctl flock or LockFileEx),
|
||||
which have different guarantees depending on Operating System and filesystem.
|
||||
:::
|
||||
|
||||
### Allowing the Provider Plugin Cache to break the dependency lock file
|
||||
|
|
|
|||
Loading…
Reference in a new issue