diff --git a/backend/remote-state/consul/client.go b/backend/remote-state/consul/client.go index 43dbdce5af..9bbfb49fa1 100644 --- a/backend/remote-state/consul/client.go +++ b/backend/remote-state/consul/client.go @@ -72,7 +72,9 @@ func (c *RemoteClient) Get() (*remote.Payload, error) { c.mu.Lock() defer c.mu.Unlock() - pair, _, err := c.Client.KV().Get(c.Path, nil) + kv := c.Client.KV() + + chunked, hash, chunks, pair, err := c.chunkedMode() if err != nil { return nil, err } @@ -82,17 +84,36 @@ func (c *RemoteClient) Get() (*remote.Payload, error) { c.modifyIndex = pair.ModifyIndex - payload := pair.Value + var payload []byte + if chunked { + for _, c := range chunks { + pair, _, err := kv.Get(c, nil) + if err != nil { + return nil, err + } + if pair == nil { + return nil, fmt.Errorf("Key %q could not be found", c) + } + payload = append(payload, pair.Value[:]...) + } + } else { + payload = pair.Value + } + // If the payload starts with 0x1f, it's gzip, not json - if len(pair.Value) >= 1 && pair.Value[0] == '\x1f' { - if data, err := uncompressState(pair.Value); err == nil { - payload = data - } else { + if len(payload) >= 1 && payload[0] == '\x1f' { + payload, err = uncompressState(payload) + if err != nil { return nil, err } } - md5 := md5.Sum(pair.Value) + md5 := md5.Sum(payload) + + if hash != "" && fmt.Sprintf("%x", md5) != hash { + return nil, fmt.Errorf("The remote state does not match the expected hash") + } + return &remote.Payload{ Data: payload, MD5: md5[:], @@ -100,9 +121,65 @@ func (c *RemoteClient) Get() (*remote.Payload, error) { } func (c *RemoteClient) Put(data []byte) error { + // The state can be stored in 4 different ways, based on the payload size + // and whether the user enabled gzip: + // - single entry mode with plain JSON: a single JSON is stored at + // "tfstate/my_project" + // - single entry mode gzip: the JSON payload is first gziped and stored at + // "tfstate/my_project" + // - chunked mode with plain JSON: the JSON payload is split in pieces and + // stored like so: + // - "tfstate/my_project" -> a JSON payload that contains the path of + // the chunks and an MD5 sum like so: + // { + // "current-hash": "abcdef1234", + // "chunks": [ + // "tfstate/my_project/tfstate.abcdef1234/0", + // "tfstate/my_project/tfstate.abcdef1234/1", + // "tfstate/my_project/tfstate.abcdef1234/2", + // ] + // } + // - "tfstate/my_project/tfstate.abcdef1234/0" -> The first chunk + // - "tfstate/my_project/tfstate.abcdef1234/1" -> The next one + // - ... + // - chunked mode with gzip: the same system but we gziped the JSON payload + // before splitting it in chunks + // + // When overwritting the current state, we need to clean the old chunks if + // we were in chunked mode (no matter whether we need to use chunks for the + // new one). To do so based on the 4 possibilities above we look at the + // value at "tfstate/my_project" and if it is: + // - absent then it's a new state and there will be nothing to cleanup, + // - not a JSON payload we were in single entry mode with gzip so there will + // be nothing to cleanup + // - a JSON payload, then we were either single entry mode with plain JSON + // or in chunked mode. To differentiate between the two we look whether a + // "current-hash" key is present in the payload. If we find one we were + // in chunked mode and we will need to remove the old chunks (whether or + // not we were using gzip does not matter in that case). + c.mu.Lock() defer c.mu.Unlock() + kv := c.Client.KV() + + // First we determine what mode we were using and to prepare the cleanup + chunked, hash, _, _, err := c.chunkedMode() + if err != nil { + return err + } + cleanupOldChunks := func() {} + if chunked { + cleanupOldChunks = func() { + // We ignore all errors that can happen here because we already + // saved the new state and there is no way to return a warning to + // the user. We may end up with dangling chunks but there is no way + // to be sure we won't. + path := strings.TrimRight(c.Path, "/") + fmt.Sprintf("/tfstate.%s/", hash) + kv.DeleteTree(path, nil) + } + } + payload := data if c.GZip { if compressedState, err := compressState(data); err == nil { @@ -112,8 +189,6 @@ func (c *RemoteClient) Put(data []byte) error { } } - kv := c.Client.KV() - // default to doing a CAS verb := consulapi.KVCAS @@ -123,9 +198,44 @@ func (c *RemoteClient) Put(data []byte) error { verb = consulapi.KVSet } + // If the payload is too large we first write the chunks and replace it + // 524288 is the default value, we just hope the user did not set a smaller + // one but there is really no reason for them to do so, if they changed it + // it is certainly to set a larger value. + limit := 524288 + if len(payload) > limit { + md5 := md5.Sum(data) + chunks := split(payload, limit) + chunkPaths := make([]string, 0) + + // First we write the new chunks + for i, p := range chunks { + path := strings.TrimRight(c.Path, "/") + fmt.Sprintf("/tfstate.%x/%d", md5, i) + chunkPaths = append(chunkPaths, path) + _, err := kv.Put(&consulapi.KVPair{ + Key: path, + Value: p, + }, nil) + + if err != nil { + return err + } + } + + // We update the link to point to the new chunks + payload, err = json.Marshal(map[string]interface{}{ + "current-hash": fmt.Sprintf("%x", md5), + "chunks": chunkPaths, + }) + if err != nil { + return err + } + } + + var txOps consulapi.KVTxnOps // KV.Put doesn't return the new index, so we use a single operation // transaction to get the new index with a single request. - txOps := consulapi.KVTxnOps{ + txOps = consulapi.KVTxnOps{ &consulapi.KVTxnOp{ Verb: verb, Key: c.Path, @@ -138,7 +248,6 @@ func (c *RemoteClient) Put(data []byte) error { if err != nil { return err } - // transaction was rolled back if !ok { return fmt.Errorf("consul CAS failed with transaction errors: %v", resp.Errors) @@ -150,6 +259,10 @@ func (c *RemoteClient) Put(data []byte) error { } c.modifyIndex = resp.Results[0].ModifyIndex + + // We remove all the old chunks + cleanupOldChunks() + return nil } @@ -158,7 +271,20 @@ func (c *RemoteClient) Delete() error { defer c.mu.Unlock() kv := c.Client.KV() - _, err := kv.Delete(c.Path, nil) + + chunked, hash, _, _, err := c.chunkedMode() + if err != nil { + return err + } + + _, err = kv.Delete(c.Path, nil) + + // If there were chunks we need to remove them + if chunked { + path := strings.TrimRight(c.Path, "/") + fmt.Sprintf("/tfstate.%s/", hash) + kv.DeleteTree(path, nil) + } + return err } @@ -473,3 +599,42 @@ func uncompressState(data []byte) ([]byte, error) { } return b.Bytes(), nil } + +func split(payload []byte, limit int) [][]byte { + var chunk []byte + chunks := make([][]byte, 0, len(payload)/limit+1) + for len(payload) >= limit { + chunk, payload = payload[:limit], payload[limit:] + chunks = append(chunks, chunk) + } + if len(payload) > 0 { + chunks = append(chunks, payload[:]) + } + return chunks +} + +func (c *RemoteClient) chunkedMode() (bool, string, []string, *consulapi.KVPair, error) { + kv := c.Client.KV() + pair, _, err := kv.Get(c.Path, nil) + if err != nil { + return false, "", nil, pair, err + } + if pair != nil { + var d map[string]interface{} + err = json.Unmarshal(pair.Value, &d) + // If there is an error when unmarshaling the payload, the state has + // probably been gziped in single entry mode. + if err == nil { + // If we find the "current-hash" key we were in chunked mode + hash, ok := d["current-hash"] + if ok { + chunks := make([]string, 0) + for _, c := range d["chunks"].([]interface{}) { + chunks = append(chunks, c.(string)) + } + return true, hash.(string), chunks, pair, nil + } + } + } + return false, "", nil, pair, nil +} diff --git a/backend/remote-state/consul/client_test.go b/backend/remote-state/consul/client_test.go index ff38f01204..a951147dc7 100644 --- a/backend/remote-state/consul/client_test.go +++ b/backend/remote-state/consul/client_test.go @@ -1,9 +1,14 @@ package consul import ( + "bytes" "context" + "encoding/json" "fmt" + "math/rand" "net" + "reflect" + "strings" "sync" "testing" "time" @@ -80,6 +85,140 @@ func TestRemoteClient_gzipUpgrade(t *testing.T) { remote.TestClient(t, state.(*remote.State).Client) } +// TestConsul_largeState tries to write a large payload using the Consul state +// manager, as there is a limit to the size of the values in the KV store it +// will need to be split up before being saved and put back together when read. +func TestConsul_largeState(t *testing.T) { + path := "tf-unit/test-large-state" + + b := backend.TestBackendConfig(t, New(), backend.TestWrapConfig(map[string]interface{}{ + "address": srv.HTTPAddr, + "path": path, + })) + + s, err := b.StateMgr(backend.DefaultStateName) + if err != nil { + t.Fatal(err) + } + + c := s.(*remote.State).Client.(*RemoteClient) + c.Path = path + + // testPaths fails the test if the keys found at the prefix don't match + // what is expected + testPaths := func(t *testing.T, expected []string) { + kv := c.Client.KV() + pairs, _, err := kv.List(c.Path, nil) + if err != nil { + t.Fatal(err) + } + res := make([]string, 0) + for _, p := range pairs { + res = append(res, p.Key) + } + if !reflect.DeepEqual(res, expected) { + t.Fatalf("Wrong keys: %#v", res) + } + } + + testPayload := func(t *testing.T, data map[string]string, keys []string) { + payload, err := json.Marshal(data) + if err != nil { + t.Fatal(err) + } + err = c.Put(payload) + if err != nil { + t.Fatal("could not put payload", err) + } + + remote, err := c.Get() + if err != nil { + t.Fatal(err) + } + + // md5 := md5.Sum(payload) + // if !bytes.Equal(md5[:], remote.MD5) { + // t.Fatal("the md5 sums do not match") + // } + + if !bytes.Equal(payload, remote.Data) { + t.Fatal("the data do not match") + } + + testPaths(t, keys) + } + + // The default limit for the size of the value in Consul is 524288 bytes + testPayload( + t, + map[string]string{ + "foo": strings.Repeat("a", 524288+2), + }, + []string{ + "tf-unit/test-large-state", + "tf-unit/test-large-state/tfstate.2cb96f52c9fff8e0b56cb786ec4d2bed/0", + "tf-unit/test-large-state/tfstate.2cb96f52c9fff8e0b56cb786ec4d2bed/1", + }, + ) + + // We try to replace the payload with a small one, the old chunks should be removed + testPayload( + t, + map[string]string{"var": "a"}, + []string{"tf-unit/test-large-state"}, + ) + + // Test with gzip and chunks + b = backend.TestBackendConfig(t, New(), backend.TestWrapConfig(map[string]interface{}{ + "address": srv.HTTPAddr, + "path": path, + "gzip": true, + })) + + s, err = b.StateMgr(backend.DefaultStateName) + if err != nil { + t.Fatal(err) + } + + c = s.(*remote.State).Client.(*RemoteClient) + c.Path = path + + // We need a long random string so it results in multiple chunks even after + // being gziped + + // We use a fixed seed so the test can be reproductible + rand.Seed(1234) + RandStringRunes := func(n int) string { + var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + b := make([]rune, n) + for i := range b { + b[i] = letterRunes[rand.Intn(len(letterRunes))] + } + return string(b) + } + + testPayload( + t, + map[string]string{ + "bar": RandStringRunes(5 * (524288 + 2)), + }, + []string{ + "tf-unit/test-large-state", + "tf-unit/test-large-state/tfstate.58e8160335864b520b1cc7f2222a4019/0", + "tf-unit/test-large-state/tfstate.58e8160335864b520b1cc7f2222a4019/1", + "tf-unit/test-large-state/tfstate.58e8160335864b520b1cc7f2222a4019/2", + "tf-unit/test-large-state/tfstate.58e8160335864b520b1cc7f2222a4019/3", + }, + ) + + // Deleting the state should remove all chunks + err = c.Delete() + if err != nil { + t.Fatal(err) + } + testPaths(t, []string{}) +} + func TestConsul_stateLock(t *testing.T) { testCases := []string{ fmt.Sprintf("tf-unit/%s", time.Now().String()),