k3s/pkg/agent/containerd/watcher.go
Brad Davidson 67291090ca Add support for conditional image tarball imports
Normally K3s will import all tarballs in the image dir on startup, and
re-import any tarballs that change while it is running.

This change allows users to opt into only importing tarballs that have
changed since they were last imported, even across restarts.

This behavior is opted into by touching a `.cache.json` file in the
images dir. This file is used to track the size and mtime of the image
files when they are imported.

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
2025-05-07 15:06:14 -07:00

333 lines
9.3 KiB
Go

package containerd
import (
"context"
"encoding/json"
"io"
"os"
"path/filepath"
"strings"
"time"
containerd "github.com/containerd/containerd/v2/client"
"github.com/fsnotify/fsnotify"
"github.com/k3s-io/k3s/pkg/agent/cri"
"github.com/k3s-io/k3s/pkg/daemons/config"
pkgerrors "github.com/pkg/errors"
"github.com/rancher/wharfie/pkg/tarfile"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/workqueue"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
)
type fileInfo struct {
Size int64 `json:"size"`
ModTime metav1.Time `json:"modTime"`
seen bool // field is not serialized, and can be used to track if a file has been seen since the last restart
}
type watchqueue struct {
cfg *config.Node
watcher *fsnotify.Watcher
filesCache map[string]*fileInfo
workqueue workqueue.TypedDelayingInterface[string]
}
func createWatcher(path string) (*fsnotify.Watcher, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
if err := watcher.Add(path); err != nil {
return nil, err
}
return watcher, nil
}
func mustCreateWatcher(path string) *fsnotify.Watcher {
watcher, err := createWatcher(path)
if err != nil {
panic("Failed to create image import watcher:" + err.Error())
}
return watcher
}
func isFileSupported(path string) bool {
for _, ext := range append(tarfile.SupportedExtensions, ".txt") {
if strings.HasSuffix(path, ext) {
return true
}
}
return false
}
// runWorkerForImages connects to containerd and calls processNextEventForImages to process items from the workqueue.
// This blocks until the workqueue is shut down.
func (w *watchqueue) runWorkerForImages(ctx context.Context) {
// create the connections to not create every time when processing a event
client, err := Client(w.cfg.Containerd.Address)
if err != nil {
logrus.Errorf("Failed to create containerd client: %v", err)
w.watcher.Close()
return
}
defer client.Close()
criConn, err := cri.Connection(ctx, w.cfg.Containerd.Address)
if err != nil {
logrus.Errorf("Failed to create CRI connection: %v", err)
w.watcher.Close()
return
}
defer criConn.Close()
imageClient := runtimeapi.NewImageServiceClient(criConn)
for w.processNextEventForImages(ctx, client, imageClient) {
}
}
// processNextEventForImages retrieves a single event from the workqueue and processes it.
// It returns a boolean that is true if the workqueue is still open and this function should be called again.
func (w *watchqueue) processNextEventForImages(ctx context.Context, client *containerd.Client, imageClient runtimeapi.ImageServiceClient) bool {
key, shutdown := w.workqueue.Get()
if shutdown {
return false
}
if err := w.processImageEvent(ctx, key, client, imageClient); err != nil {
logrus.Errorf("Failed to process image event: %v", err)
}
return true
}
// processImageEvent processes a single item from the workqueue.
func (w *watchqueue) processImageEvent(ctx context.Context, key string, client *containerd.Client, imageClient runtimeapi.ImageServiceClient) error {
defer w.workqueue.Done(key)
// Watch is rooted at the parent dir of the images dir, but we only need to handle things within the images dir
if !strings.HasPrefix(key, w.cfg.Images) {
return nil
}
file, err := os.Stat(key)
// if the file does not exists, we assume that the event was RENAMED or REMOVED
if os.IsNotExist(err) {
// if the whole images dir was removed, reset the fileinfo cache
if key == w.cfg.Images {
w.filesCache = make(map[string]*fileInfo)
defer w.syncCache()
return nil
}
if !isFileSupported(key) {
return nil
}
delete(w.filesCache, key)
defer w.syncCache()
return nil
} else if err != nil {
return pkgerrors.Wrapf(err, "failed to get fileinfo for image event %s", key)
}
if file.IsDir() {
// Add to watch and list+enqueue directory contents, as notify is not recursive
if err := w.watcher.Add(key); err != nil {
return pkgerrors.Wrapf(err, "failed to add watch of %s", key)
}
fileInfos, err := os.ReadDir(key)
if err != nil {
return pkgerrors.Wrapf(err, "unable to list contents of %s", key)
}
for _, fileInfo := range fileInfos {
w.workqueue.Add(filepath.Join(key, fileInfo.Name()))
}
return nil
}
if !isFileSupported(key) {
return nil
}
if lastFileState := w.filesCache[key]; lastFileState == nil || (file.Size() != lastFileState.Size && file.ModTime().After(lastFileState.ModTime.Time)) {
start := time.Now()
if err := preloadFile(ctx, w.cfg, client, imageClient, key); err != nil {
return pkgerrors.Wrapf(err, "failed to import %s", key)
}
logrus.Infof("Imported images from %s in %s", key, time.Since(start))
w.filesCache[key] = &fileInfo{Size: file.Size(), ModTime: metav1.NewTime(file.ModTime()), seen: true}
defer w.syncCache()
} else if lastFileState != nil && !lastFileState.seen {
lastFileState.seen = true
// no need to sync as the field is not serialized
}
return nil
}
// pruneCache removes entries for all files that have not been seen since the last restart,
// and syncs the cache to disk. This is done to ensure that the cache file does not grow without
// bounds by continuing to track files that do not exist.
func (w *watchqueue) pruneCache() {
for path, fileState := range w.filesCache {
if !fileState.seen {
delete(w.filesCache, path)
}
}
w.syncCache()
}
// syncCache writes the fileinfo cache to disk.
// if the cache file does not exist, this is a no-op. The file must be manually
// created by the user in order for the cache to be persisted across restarts.
func (w *watchqueue) syncCache() {
filePath := filepath.Join(w.cfg.Images, ".cache.json")
f, err := os.OpenFile(filePath, os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
if !os.IsNotExist(err) {
logrus.Errorf("Failed to truncate image import fileinfo cache: %v", err)
}
return
}
defer f.Close()
b, err := json.Marshal(&w.filesCache)
if err != nil {
logrus.Errorf("Failed to marshal image import fileinfo cache: %v", err)
return
}
if _, err := f.Write(b); err != nil {
logrus.Errorf("Failed to write image import fileinfo cache: %v", err)
}
}
// loadCaache reads the fileinfo cache from disk.
// It is not an error if this file exists or is empty.
func (w *watchqueue) loadCache() {
filePath := filepath.Join(w.cfg.Images, ".cache.json")
f, err := os.OpenFile(filePath, os.O_RDONLY, 0664)
if err != nil {
if !os.IsNotExist(err) {
logrus.Errorf("Failed to open image import fileinfo cache: %v", err)
}
return
}
defer f.Close()
b, err := io.ReadAll(f)
if err != nil {
logrus.Errorf("Failed to read image import fileinfo cache: %v", err)
return
}
// 0 byte file is fine, but don't try to load it - allows users to simply
// touch the file to enable it for use.
if len(b) == 0 {
return
}
if err := json.Unmarshal(b, &w.filesCache); err != nil {
logrus.Errorf("Failed to unmarshal image import fileinfo cache: %v", err)
}
}
// importAndWatchImages starts the image watcher and workqueue.
// This function block until the workqueue is empty, indicating that all images
// that currently exist have been imported.
func importAndWatchImages(ctx context.Context, cfg *config.Node) error {
w, err := watchImages(ctx, cfg)
if err != nil {
return err
}
// Add images dir to workqueue; if it exists and contains images
// they will be recursively listed and enqueued.
w.workqueue.Add(cfg.Images)
// wait for the workqueue to empty before returning
for w.workqueue.Len() > 0 {
logrus.Debugf("Waiting for initial import of images from %s", cfg.Images)
time.Sleep(time.Second * 2)
}
// prune unseen entries from last run once all existing files have been processed
w.pruneCache()
return nil
}
// watchImages starts a watcher on the parent of the images dir, and a workqueue to process events
// from the watch stream.
func watchImages(ctx context.Context, cfg *config.Node) (*watchqueue, error) {
// watch the directory above the images dir, as it may not exist yet when the watch is started.
watcher, err := createWatcher(filepath.Dir(cfg.Images))
if err != nil {
return nil, pkgerrors.Wrapf(err, "failed to create image import watcher for %s", filepath.Dir(cfg.Images))
}
w := &watchqueue{
cfg: cfg,
watcher: watcher,
filesCache: make(map[string]*fileInfo),
workqueue: workqueue.TypedNewDelayingQueue[string](),
}
logrus.Debugf("Image import watcher created")
w.loadCache()
go func() {
<-ctx.Done()
w.watcher.Close()
}()
go w.runWorkerForImages(ctx)
go func() {
for {
select {
case event, ok := <-w.watcher.Events:
if !ok {
logrus.Info("Image import watcher event channel closed; retrying in 5 seconds")
select {
case <-time.After(time.Second * 5):
w.watcher = mustCreateWatcher(filepath.Dir(cfg.Images))
case <-ctx.Done():
return
}
}
// only enqueue event if it is for a path within the images dir - not the parent dir that we are watching
if strings.HasPrefix(event.Name, cfg.Images) {
w.workqueue.AddAfter(event.Name, time.Second*2)
}
case err, ok := <-w.watcher.Errors:
if !ok {
logrus.Info("Image import watcher error channel closed; retrying in 5 seconds")
select {
case <-time.After(time.Second * 5):
w.watcher = mustCreateWatcher(filepath.Dir(cfg.Images))
case <-ctx.Done():
return
}
}
logrus.Errorf("Image import watcher received an error: %v", err)
}
}
}()
return w, nil
}