mirror of
https://github.com/hashicorp/vault.git
synced 2026-02-18 18:38:08 -05:00
VAULT-35369: Loaded snapshots CE (#30529)
* full load * snapshot manager tested * integration test * more tetsts * remove obselete test * fix failing test * move tesdata to ent folder * add test for RaftDataDirPath * fix race condition, don't create new barrier instance * check for nil result * remove encryption from the barrier storage wrapper * Update physical/raft/fsm.go Co-authored-by: Kuba Wieczorek <kuba.wieczorek@hashicorp.com> * fmt --------- Co-authored-by: Kuba Wieczorek <kuba.wieczorek@hashicorp.com>
This commit is contained in:
parent
c04e486e2a
commit
bea32a14ab
9 changed files with 135 additions and 139 deletions
|
|
@ -297,6 +297,10 @@ func ClusterSetup(conf *vault.CoreConfig, opts *vault.TestClusterOptions, setup
|
|||
if opts != nil {
|
||||
localOpts = *opts
|
||||
}
|
||||
if localOpts.HandlerFunc == nil {
|
||||
localOpts.HandlerFunc = vaulthttp.Handler
|
||||
}
|
||||
|
||||
if setup == nil {
|
||||
setup = InmemBackendSetup
|
||||
}
|
||||
|
|
|
|||
|
|
@ -180,10 +180,20 @@ type FSM struct {
|
|||
|
||||
localID string
|
||||
desiredSuffrage string
|
||||
// metricSuffix should contain a dash, since it will be appended directly to the end of the key string.
|
||||
metricSuffix string
|
||||
}
|
||||
|
||||
func NewReadOnlyFSM(path string, localID string, logger log.Logger) (*FSM, error) {
|
||||
return newFSM(path, localID, logger, "-readonly")
|
||||
}
|
||||
|
||||
// NewFSM constructs a FSM using the given directory
|
||||
func NewFSM(path string, localID string, logger log.Logger) (*FSM, error) {
|
||||
return newFSM(path, localID, logger, "")
|
||||
}
|
||||
|
||||
func newFSM(path string, localID string, logger log.Logger, metricSuffix string) (*FSM, error) {
|
||||
// Initialize the latest term, index, and config values
|
||||
latestTerm := new(uint64)
|
||||
latestIndex := new(uint64)
|
||||
|
|
@ -204,6 +214,7 @@ func NewFSM(path string, localID string, logger log.Logger) (*FSM, error) {
|
|||
// setup if this is already part of a cluster with a desired suffrage.
|
||||
desiredSuffrage: "voter",
|
||||
localID: localID,
|
||||
metricSuffix: metricSuffix,
|
||||
}
|
||||
|
||||
f.chunker = &logVerificationChunkingShim{
|
||||
|
|
@ -559,8 +570,8 @@ func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error {
|
|||
// Get retrieves the value at the given path from the bolt file.
|
||||
func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) {
|
||||
// TODO: Remove this outdated metric name in an older release
|
||||
defer metrics.MeasureSince([]string{"raft", "get"}, time.Now())
|
||||
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "get"}, time.Now())
|
||||
defer metrics.MeasureSince([]string{"raft", fmt.Sprintf("get%s", f.metricSuffix)}, time.Now())
|
||||
defer metrics.MeasureSince([]string{"raft_storage", "fsm", fmt.Sprintf("get%s", f.metricSuffix)}, time.Now())
|
||||
|
||||
f.l.RLock()
|
||||
defer f.l.RUnlock()
|
||||
|
|
@ -607,8 +618,8 @@ func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error {
|
|||
// List retrieves the set of keys with the given prefix from the bolt file.
|
||||
func (f *FSM) List(ctx context.Context, prefix string) ([]string, error) {
|
||||
// TODO: Remove this outdated metric name in a future release
|
||||
defer metrics.MeasureSince([]string{"raft", "list"}, time.Now())
|
||||
defer metrics.MeasureSince([]string{"raft_storage", "fsm", "list"}, time.Now())
|
||||
defer metrics.MeasureSince([]string{"raft", fmt.Sprintf("list%s", f.metricSuffix)}, time.Now())
|
||||
defer metrics.MeasureSince([]string{"raft_storage", "fsm", fmt.Sprintf("list%s", f.metricSuffix)}, time.Now())
|
||||
|
||||
f.l.RLock()
|
||||
defer f.l.RUnlock()
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"maps"
|
||||
"math/rand"
|
||||
"net/url"
|
||||
"os"
|
||||
|
|
@ -2336,3 +2337,10 @@ func (b *RaftBackend) ReloadConfig(config raft.ReloadableConfig) error {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetStorageConfig returns a map with the raft configuration from the storage stanza in the config file.
|
||||
func (b *RaftBackend) GetStorageConfig() map[string]string {
|
||||
storageConfig := make(map[string]string)
|
||||
maps.Copy(storageConfig, b.conf)
|
||||
return storageConfig
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,7 +19,6 @@ import (
|
|||
|
||||
"github.com/golang/protobuf/proto"
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
iradix "github.com/hashicorp/go-immutable-radix"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/vault/sdk/plugin/pb"
|
||||
"github.com/rboyer/safeio"
|
||||
|
|
@ -504,8 +503,8 @@ func snapshotName(term, index uint64) string {
|
|||
// FSM. The caller is responsible for closing the reader.
|
||||
// If pathsToFilter is not nil, the function will filter out any keys that are
|
||||
// found in the pathsToFilter tree.
|
||||
func LoadReadOnlySnapshot(fsm *FSM, snapshotFile io.ReadCloser, pathsToFilter *iradix.Tree, logger log.Logger) error {
|
||||
return loadSnapshot(fsm.db, logger, snapshotFile, pathsToFilter, true)
|
||||
func LoadReadOnlySnapshot(fsm *FSM, snapshotFile io.ReadCloser, filterKey func(key string) bool, logger log.Logger) error {
|
||||
return loadSnapshot(fsm.db, logger, snapshotFile, filterKey, true)
|
||||
}
|
||||
|
||||
// loadSnapshot loads a snapshot from a file into the supplied boltDB database.
|
||||
|
|
@ -515,7 +514,7 @@ func LoadReadOnlySnapshot(fsm *FSM, snapshotFile io.ReadCloser, pathsToFilter *i
|
|||
// to 1.0.
|
||||
// If pathsToFilter is not nil, the function will filter out any keys that are
|
||||
// found in the pathsToFilter tree.
|
||||
func loadSnapshot(db *bolt.DB, logger log.Logger, snapshotFile io.ReadCloser, pathsToFilter *iradix.Tree, readOnly bool) error {
|
||||
func loadSnapshot(db *bolt.DB, logger log.Logger, snapshotFile io.ReadCloser, filterKey func(key string) bool, readOnly bool) error {
|
||||
// The delimited reader will parse full proto messages from the snapshot data.
|
||||
protoReader := NewDelimitedReader(snapshotFile, math.MaxInt32)
|
||||
defer protoReader.Close()
|
||||
|
|
@ -545,12 +544,8 @@ func loadSnapshot(db *bolt.DB, logger log.Logger, snapshotFile io.ReadCloser, pa
|
|||
return err
|
||||
}
|
||||
|
||||
if pathsToFilter != nil {
|
||||
keyToLookUp := []byte(entry.Key)
|
||||
_, _, ok := pathsToFilter.Root().LongestPrefix(keyToLookUp)
|
||||
if ok {
|
||||
continue
|
||||
}
|
||||
if filterKey != nil && filterKey(entry.Key) {
|
||||
continue
|
||||
}
|
||||
|
||||
err = b.Put([]byte(entry.Key), entry.Value)
|
||||
|
|
|
|||
|
|
@ -986,11 +986,15 @@ func TestLoadReadOnlySnapshot(t *testing.T) {
|
|||
_, _ = txn.Insert([]byte("/path/to/exclude/1"), []byte("value"))
|
||||
_, _ = txn.Insert([]byte("/different/path/to/exclude"), []byte("value"))
|
||||
pathsToExclude = txn.Commit()
|
||||
toExclude := func(key string) bool {
|
||||
_, _, ok := pathsToExclude.Root().LongestPrefix([]byte(key))
|
||||
return ok
|
||||
}
|
||||
|
||||
// Create an FSM to load the snapshot data into.
|
||||
fsm, err := NewFSM(dir, "test-fsm", logger)
|
||||
|
||||
err = LoadReadOnlySnapshot(fsm, snapshotFile, pathsToExclude, logger)
|
||||
err = LoadReadOnlySnapshot(fsm, snapshotFile, toExclude, logger)
|
||||
require.NoError(t, err)
|
||||
value, err := fsm.Get(context.Background(), "/path/to/exclude/1")
|
||||
require.NoError(t, err)
|
||||
|
|
|
|||
|
|
@ -3,7 +3,13 @@
|
|||
|
||||
package vault
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/hashicorp/vault/sdk/logical"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
)
|
||||
|
||||
// BarrierEncryptorAccess is a wrapper around BarrierEncryptor that allows Core
|
||||
// to expose its barrier encrypt/decrypt operations through BarrierEncryptorAccess()
|
||||
|
|
@ -25,3 +31,56 @@ func (b *BarrierEncryptorAccess) Encrypt(ctx context.Context, key string, plaint
|
|||
func (b *BarrierEncryptorAccess) Decrypt(ctx context.Context, key string, ciphertext []byte) ([]byte, error) {
|
||||
return b.barrierEncryptor.Decrypt(ctx, key, ciphertext)
|
||||
}
|
||||
|
||||
// NewBarrierDecryptingStorage returns a view of storage that will decrypt the
|
||||
// storage values for Get operations. The returned storage is read-only, and
|
||||
// will error on attempts to Put or Delete.
|
||||
func NewBarrierDecryptingStorage(
|
||||
barrier BarrierEncryptor, underlying physical.Backend,
|
||||
) logical.Storage {
|
||||
return &barrierDecryptingStorage{
|
||||
barrier: barrier,
|
||||
underlying: underlying,
|
||||
}
|
||||
}
|
||||
|
||||
type barrierDecryptingStorage struct {
|
||||
barrier BarrierEncryptor
|
||||
underlying physical.Backend
|
||||
}
|
||||
|
||||
func (b *barrierDecryptingStorage) List(ctx context.Context, prefix string) ([]string, error) {
|
||||
return b.underlying.List(ctx, prefix)
|
||||
}
|
||||
|
||||
// ignore-nil-nil-function-check
|
||||
func (b *barrierDecryptingStorage) Get(ctx context.Context, key string) (*logical.StorageEntry, error) {
|
||||
entry, err := b.underlying.Get(ctx, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if entry == nil {
|
||||
return nil, nil
|
||||
}
|
||||
decrypted, err := b.barrier.Decrypt(ctx, entry.Key, entry.Value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &logical.StorageEntry{
|
||||
Key: entry.Key,
|
||||
Value: decrypted,
|
||||
SealWrap: entry.SealWrap,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var errReadOnly = errors.New("read-only storage")
|
||||
|
||||
func (b *barrierDecryptingStorage) Put(ctx context.Context, entry *logical.StorageEntry) error {
|
||||
return errReadOnly
|
||||
}
|
||||
|
||||
func (b *barrierDecryptingStorage) Delete(ctx context.Context, key string) error {
|
||||
return errReadOnly
|
||||
}
|
||||
|
||||
var _ logical.Storage = (*barrierDecryptingStorage)(nil)
|
||||
|
|
|
|||
|
|
@ -1577,3 +1577,37 @@ func TestRaftCluster_Removed_ReAdd(t *testing.T) {
|
|||
_, err = follower.Client.Sys().RaftJoin(joinReq)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
// TestCore_RaftDataDirPath verifies that the RaftDataDirPath method returns a
|
||||
// data path when the storage backend is raft, and no data path when the storage
|
||||
// is not raft
|
||||
func TestCore_RaftDataDirPath(t *testing.T) {
|
||||
t.Parallel()
|
||||
t.Run("raft", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
cluster, _ := raftCluster(t, nil)
|
||||
defer cluster.Cleanup()
|
||||
path, ok := cluster.Cores[0].RaftDataDirPath()
|
||||
require.True(t, ok)
|
||||
require.NotEmpty(t, path)
|
||||
})
|
||||
t.Run("inmem", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
core, _, _ := vault.TestCoreUnsealed(t)
|
||||
defer core.Shutdown()
|
||||
path, ok := core.RaftDataDirPath()
|
||||
require.False(t, ok)
|
||||
require.Empty(t, path)
|
||||
})
|
||||
t.Run("raft ha", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
var conf vault.CoreConfig
|
||||
opts := vault.TestClusterOptions{HandlerFunc: vaulthttp.Handler}
|
||||
teststorage.RaftHASetup(&conf, &opts, teststorage.MakeFileBackend)
|
||||
cluster := vault.NewTestCluster(t, &conf, &opts)
|
||||
defer cluster.Cleanup()
|
||||
path, ok := cluster.Cores[0].RaftDataDirPath()
|
||||
require.False(t, ok)
|
||||
require.Empty(t, path)
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1529,16 +1529,13 @@ func (c *Core) isRaftUnseal() bool {
|
|||
// RaftDataDirPath returns the string path to the raft data directory and true,
|
||||
// or an empty string and false if it fails to find it or if the value is an empty string.
|
||||
func (c *Core) RaftDataDirPath() (string, bool) {
|
||||
config := c.GetCoreConfigInternal()
|
||||
if config == nil || config.Storage == nil || config.Storage.Config == nil {
|
||||
p, ok := c.underlyingPhysical.(*raft.RaftBackend)
|
||||
if !ok {
|
||||
return "", false
|
||||
}
|
||||
|
||||
if config.Storage.Type != "raft" {
|
||||
return "", false
|
||||
}
|
||||
|
||||
path, ok := config.Storage.Config["path"]
|
||||
storageConfig := p.GetStorageConfig()
|
||||
path, ok := storageConfig["path"]
|
||||
if !ok || path == "" {
|
||||
return "", false
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,11 +4,7 @@
|
|||
package vault
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/vault/command/server"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestFormatDiscoveredAddr validates that the string returned by formatDiscoveredAddr always respect the format `host:port`.
|
||||
|
|
@ -35,115 +31,3 @@ func TestFormatDiscoveredAddr(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestRaftDirPath verifies that the Raft data directory path is correctly extracted from the storage configuration.
|
||||
// It exercises all execution paths in the RaftDataDirPath to ensure that:
|
||||
// - The path is correctly extracted on a happy path.
|
||||
// - In case of an empty path in config, it returns an empty string and true.
|
||||
// - If there are any issues getting the path, it returns an empty string and false.
|
||||
func TestRaftDirPath(t *testing.T) {
|
||||
testRaftPath := "/storage/path/raft"
|
||||
|
||||
testCases := map[string]struct {
|
||||
config *CoreConfig
|
||||
expectedRaftPath string
|
||||
shouldError bool
|
||||
}{
|
||||
"happy-path": {
|
||||
config: &CoreConfig{
|
||||
RawConfig: &server.Config{
|
||||
Storage: &server.Storage{
|
||||
Type: "raft",
|
||||
Config: map[string]string{
|
||||
"path": testRaftPath,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedRaftPath: testRaftPath,
|
||||
},
|
||||
"empty-raft-data-dir-path": {
|
||||
config: &CoreConfig{
|
||||
RawConfig: &server.Config{
|
||||
Storage: &server.Storage{
|
||||
Type: "raft",
|
||||
Config: map[string]string{
|
||||
"path": "",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedRaftPath: "",
|
||||
shouldError: true,
|
||||
},
|
||||
"no-config": {
|
||||
config: &CoreConfig{
|
||||
RawConfig: nil,
|
||||
},
|
||||
expectedRaftPath: "",
|
||||
shouldError: true,
|
||||
},
|
||||
"no-storage": {
|
||||
config: &CoreConfig{
|
||||
RawConfig: &server.Config{
|
||||
Storage: nil,
|
||||
},
|
||||
},
|
||||
expectedRaftPath: "",
|
||||
shouldError: true,
|
||||
},
|
||||
"no-storage-config": {
|
||||
config: &CoreConfig{
|
||||
RawConfig: &server.Config{
|
||||
Storage: &server.Storage{
|
||||
Type: "raft",
|
||||
Config: nil,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedRaftPath: "",
|
||||
shouldError: true,
|
||||
},
|
||||
"no-storage-type": {
|
||||
config: &CoreConfig{
|
||||
RawConfig: &server.Config{
|
||||
Storage: &server.Storage{
|
||||
Type: "",
|
||||
Config: map[string]string{
|
||||
"path": testRaftPath,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedRaftPath: "",
|
||||
shouldError: true,
|
||||
},
|
||||
"no-raft-data-dir-path-in-config": {
|
||||
config: &CoreConfig{
|
||||
RawConfig: &server.Config{
|
||||
Storage: &server.Storage{
|
||||
Type: "raft",
|
||||
Config: map[string]string{},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedRaftPath: "",
|
||||
shouldError: true,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
fmt.Println("name: ", name)
|
||||
core, _, _ := TestCoreUnsealedWithConfig(t, tc.config)
|
||||
raftPath, ok := core.RaftDataDirPath()
|
||||
if tc.shouldError {
|
||||
require.False(t, ok)
|
||||
require.Empty(t, raftPath)
|
||||
} else {
|
||||
require.True(t, ok)
|
||||
require.Equal(t, tc.expectedRaftPath, raftPath)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue