Merge pull request #133988 from bart0sh/PR199-migrate-kubelet-certificate-to-contextual-logging

migrate kubelet/certificate to contextual logging
This commit is contained in:
Kubernetes Prow Robot 2025-09-16 02:24:12 -07:00 committed by GitHub
commit b40d570248
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 50 additions and 35 deletions

View file

@ -954,7 +954,7 @@ func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, tp
// bootstrap the cert manager with the contents of the initial client config.
logger.Info("Client rotation is on, will bootstrap in background")
certConfig, clientConfig, err := bootstrap.LoadClientConfig(s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory)
certConfig, clientConfig, err := bootstrap.LoadClientConfig(logger, s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory)
if err != nil {
return nil, nil, err
}
@ -992,7 +992,7 @@ func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, tp
// we set exitAfter to five minutes because we use this client configuration to request new certs - if we are unable
// to request new certs, we will be unable to continue normal operation. Exiting the process allows a wrapper
// or the bootstrapping credentials to potentially lay down new initial config.
closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, transportConfig, clientCertificateManager, 5*time.Minute)
closeAllConns, err := kubeletcertificate.UpdateTransport(logger, wait.NeverStop, transportConfig, clientCertificateManager, 5*time.Minute)
if err != nil {
return nil, nil, err
}

View file

@ -216,6 +216,7 @@ linters:
contextual k8s.io/kubernetes/pkg/security/.*
contextual k8s.io/kubernetes/pkg/securitycontext/.*
contextual k8s.io/kubernetes/test/e2e/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/certificate/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/lifecycle/.*

View file

@ -230,6 +230,7 @@ linters:
contextual k8s.io/kubernetes/pkg/security/.*
contextual k8s.io/kubernetes/pkg/securitycontext/.*
contextual k8s.io/kubernetes/test/e2e/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/certificate/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/lifecycle/.*

View file

@ -62,6 +62,7 @@ contextual k8s.io/kubernetes/pkg/scheduler/.*
contextual k8s.io/kubernetes/pkg/security/.*
contextual k8s.io/kubernetes/pkg/securitycontext/.*
contextual k8s.io/kubernetes/test/e2e/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/certificate/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/dra/.*
contextual k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/.*
contextual k8s.io/kubernetes/pkg/kubelet/lifecycle/.*

View file

@ -56,13 +56,13 @@ const tmpPrivateKeyFile = "kubelet-client.key.tmp"
// kubeconfigPath on disk is populated based on bootstrapPath but pointing to the location of the client cert
// in certDir. This preserves the historical behavior of bootstrapping where on subsequent restarts the
// most recent client cert is used to request new client certs instead of the initial token.
func LoadClientConfig(kubeconfigPath, bootstrapPath, certDir string) (certConfig, userConfig *restclient.Config, err error) {
func LoadClientConfig(logger klog.Logger, kubeconfigPath, bootstrapPath, certDir string) (certConfig, userConfig *restclient.Config, err error) {
if len(bootstrapPath) == 0 {
clientConfig, err := loadRESTClientConfig(kubeconfigPath)
if err != nil {
return nil, nil, fmt.Errorf("unable to load kubeconfig: %v", err)
}
klog.V(2).InfoS("No bootstrapping requested, will use kubeconfig")
logger.V(2).Info("No bootstrapping requested, will use kubeconfig")
return clientConfig, restclient.CopyConfig(clientConfig), nil
}
@ -82,7 +82,7 @@ func LoadClientConfig(kubeconfigPath, bootstrapPath, certDir string) (certConfig
if err != nil {
return nil, nil, fmt.Errorf("unable to load kubeconfig: %v", err)
}
klog.V(2).InfoS("Current kubeconfig file contents are still valid, no bootstrap necessary")
logger.V(2).Info("Current kubeconfig file contents are still valid, no bootstrap necessary")
return clientConfig, restclient.CopyConfig(clientConfig), nil
}
@ -98,7 +98,7 @@ func LoadClientConfig(kubeconfigPath, bootstrapPath, certDir string) (certConfig
if err := writeKubeconfigFromBootstrapping(clientConfig, kubeconfigPath, pemPath); err != nil {
return nil, nil, err
}
klog.V(2).InfoS("Use the bootstrap credentials to request a cert, and set kubeconfig to point to the certificate dir")
logger.V(2).Info("Use the bootstrap credentials to request a cert, and set kubeconfig to point to the certificate dir")
return bootstrapClientConfig, clientConfig, nil
}
@ -107,17 +107,18 @@ func LoadClientConfig(kubeconfigPath, bootstrapPath, certDir string) (certConfig
// On success, a kubeconfig file referencing the generated key and obtained certificate is written to kubeconfigPath.
// The certificate and key file are stored in certDir.
func LoadClientCert(ctx context.Context, kubeconfigPath, bootstrapPath, certDir string, nodeName types.NodeName) error {
logger := klog.FromContext(ctx)
// Short-circuit if the kubeconfig file exists and is valid.
ok, err := isClientConfigStillValid(kubeconfigPath)
if err != nil {
return err
}
if ok {
klog.V(2).InfoS("Kubeconfig exists and is valid, skipping bootstrap", "path", kubeconfigPath)
logger.V(2).Info("Kubeconfig exists and is valid, skipping bootstrap", "path", kubeconfigPath)
return nil
}
klog.V(2).InfoS("Using bootstrap kubeconfig to generate TLS client cert, key and kubeconfig file")
logger.V(2).Info("Using bootstrap kubeconfig to generate TLS client cert, key and kubeconfig file")
bootstrapClientConfig, err := loadRESTClientConfig(bootstrapPath)
if err != nil {
@ -148,7 +149,7 @@ func LoadClientCert(ctx context.Context, kubeconfigPath, bootstrapPath, certDir
// managed by the store.
privKeyPath := filepath.Join(certDir, tmpPrivateKeyFile)
if !verifyKeyData(keyData) {
klog.V(2).InfoS("No valid private key and/or certificate found, reusing existing private key or creating a new one")
logger.V(2).Info("No valid private key and/or certificate found, reusing existing private key or creating a new one")
// Note: always call LoadOrGenerateKeyFile so that private key is
// reused on next startup if CSR request fails.
keyData, _, err = keyutil.LoadOrGenerateKeyFile(privKeyPath)
@ -158,7 +159,7 @@ func LoadClientCert(ctx context.Context, kubeconfigPath, bootstrapPath, certDir
}
if err := waitForServer(ctx, *bootstrapClientConfig, 1*time.Minute); err != nil {
klog.InfoS("Error waiting for apiserver to come up", "err", err)
logger.Info("Error waiting for apiserver to come up", "err", err)
}
certData, err := requestNodeCertificate(ctx, bootstrapClient, keyData, nodeName)
@ -169,7 +170,7 @@ func LoadClientCert(ctx context.Context, kubeconfigPath, bootstrapPath, certDir
return err
}
if err := os.Remove(privKeyPath); err != nil && !os.IsNotExist(err) {
klog.V(2).InfoS("Failed cleaning up private key file", "path", privKeyPath, "err", err)
logger.V(2).Info("Failed cleaning up private key file", "path", privKeyPath, "err", err)
}
return writeKubeconfigFromBootstrapping(bootstrapClientConfig, kubeconfigPath, store.CurrentPath())
@ -293,7 +294,7 @@ func waitForServer(ctx context.Context, cfg restclient.Config, deadline time.Dur
var connected bool
wait.JitterUntil(func() {
if _, err := cli.Get().AbsPath("/healthz").Do(ctx).Raw(); err != nil {
klog.InfoS("Failed to connect to apiserver", "err", err)
klog.FromContext(ctx).Info("Failed to connect to apiserver", "err", err)
return
}
cancel()
@ -355,7 +356,7 @@ func requestNodeCertificate(ctx context.Context, client clientset.Interface, pri
ctx, cancel := context.WithTimeout(ctx, 3600*time.Second)
defer cancel()
klog.V(2).InfoS("Waiting for client certificate to be issued")
klog.FromContext(ctx).V(2).Info("Waiting for client certificate to be issued")
return csr.WaitForCertificate(ctx, client, reqName, reqUID)
}

View file

@ -17,7 +17,6 @@ limitations under the License.
package bootstrap
import (
"context"
"fmt"
"io"
"os"
@ -26,6 +25,7 @@ import (
"testing"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/kubernetes/test/utils/ktesting"
"github.com/google/go-cmp/cmp"
certificatesv1 "k8s.io/api/certificates/v1"
@ -273,9 +273,10 @@ users:
},
}
logger, _ := ktesting.NewTestContext(t)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
certConfig, clientConfig, err := LoadClientConfig(test.kubeconfigPath, test.bootstrapPath, test.certDir)
certConfig, clientConfig, err := LoadClientConfig(logger, test.kubeconfigPath, test.bootstrapPath, test.certDir)
if err != nil {
t.Fatal(err)
}
@ -348,7 +349,8 @@ users:
}
func TestRequestNodeCertificateNoKeyData(t *testing.T) {
certData, err := requestNodeCertificate(context.TODO(), newClientset(fakeClient{}), []byte{}, "fake-node-name")
tCtx := ktesting.Init(t)
certData, err := requestNodeCertificate(tCtx, newClientset(fakeClient{}), []byte{}, "fake-node-name")
if err == nil {
t.Errorf("Got no error, wanted error an error because there was an empty private key passed in.")
}
@ -358,6 +360,7 @@ func TestRequestNodeCertificateNoKeyData(t *testing.T) {
}
func TestRequestNodeCertificateErrorCreatingCSR(t *testing.T) {
tCtx := ktesting.Init(t)
client := newClientset(fakeClient{
failureType: createError,
})
@ -366,7 +369,7 @@ func TestRequestNodeCertificateErrorCreatingCSR(t *testing.T) {
t.Fatalf("Unable to generate a new private key: %v", err)
}
certData, err := requestNodeCertificate(context.TODO(), client, privateKeyData, "fake-node-name")
certData, err := requestNodeCertificate(tCtx, client, privateKeyData, "fake-node-name")
if err == nil {
t.Errorf("Got no error, wanted error an error because client.Create failed.")
}
@ -376,12 +379,13 @@ func TestRequestNodeCertificateErrorCreatingCSR(t *testing.T) {
}
func TestRequestNodeCertificate(t *testing.T) {
tCtx := ktesting.Init(t)
privateKeyData, err := keyutil.MakeEllipticPrivateKeyPEM()
if err != nil {
t.Fatalf("Unable to generate a new private key: %v", err)
}
certData, err := requestNodeCertificate(context.TODO(), newClientset(fakeClient{}), privateKeyData, "fake-node-name")
certData, err := requestNodeCertificate(tCtx, newClientset(fakeClient{}), privateKeyData, "fake-node-name")
if err != nil {
t.Errorf("Got %v, wanted no error.", err)
}

View file

@ -282,14 +282,17 @@ type kubeletServerCertificateDynamicFileManager struct {
// Enqueue implements the functions to be notified when the serving cert content changes.
func (m *kubeletServerCertificateDynamicFileManager) Enqueue() {
// Use klog.TODO() because we currently do not have a proper logger to pass in.
// Replace this with an appropriate logger when refactoring this function to accept a logger parameter.
logger := klog.TODO()
certContent, keyContent := m.dynamicCertificateContent.CurrentCertKeyContent()
cert, err := tls.X509KeyPair(certContent, keyContent)
if err != nil {
klog.ErrorS(err, "invalid certificate and key pair from file", "certFile", m.certFile, "keyFile", m.keyFile)
logger.Error(err, "invalid certificate and key pair from file", "certFile", m.certFile, "keyFile", m.keyFile)
return
}
m.currentTLSCertificate.Store(&cert)
klog.V(4).InfoS("loaded certificate and key pair in kubelet server certificate manager", "certFile", m.certFile, "keyFile", m.keyFile)
logger.V(4).Info("loaded certificate and key pair in kubelet server certificate manager", "certFile", m.certFile, "keyFile", m.keyFile)
}
// Current returns the last valid certificate key pair loaded from files.
@ -300,7 +303,9 @@ func (m *kubeletServerCertificateDynamicFileManager) Current() *tls.Certificate
// Start starts watching the certificate and key files
func (m *kubeletServerCertificateDynamicFileManager) Start() {
var ctx context.Context
ctx, m.cancelFn = context.WithCancel(context.Background())
// Use context.TODO() because we currently do not have a proper context to pass in.
// This should be replaced with an appropriate context when refactoring this function to accept a context parameter.
ctx, m.cancelFn = context.WithCancel(context.TODO())
go m.dynamicCertificateContent.Run(ctx, 1)
}

View file

@ -55,13 +55,13 @@ import (
//
// stopCh should be used to indicate when the transport is unused and doesn't need
// to continue checking the manager.
func UpdateTransport(stopCh <-chan struct{}, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitAfter time.Duration) (func(), error) {
return updateTransport(stopCh, 10*time.Second, clientConfig, clientCertificateManager, exitAfter)
func UpdateTransport(logger klog.Logger, stopCh <-chan struct{}, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitAfter time.Duration) (func(), error) {
return updateTransport(logger, stopCh, 10*time.Second, clientConfig, clientCertificateManager, exitAfter)
}
// updateTransport is an internal method that exposes how often this method checks that the
// client cert has changed.
func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitAfter time.Duration) (func(), error) {
func updateTransport(logger klog.Logger, stopCh <-chan struct{}, period time.Duration, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitAfter time.Duration) (func(), error) {
if clientConfig.Transport != nil || clientConfig.Dial != nil {
return nil, fmt.Errorf("there is already a transport or dialer configured")
}
@ -69,7 +69,7 @@ func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig
d := connrotation.NewDialer((&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext)
if clientCertificateManager != nil {
if err := addCertRotation(stopCh, period, clientConfig, clientCertificateManager, exitAfter, d); err != nil {
if err := addCertRotation(logger, stopCh, period, clientConfig, clientCertificateManager, exitAfter, d); err != nil {
return nil, err
}
} else {
@ -79,7 +79,7 @@ func updateTransport(stopCh <-chan struct{}, period time.Duration, clientConfig
return d.CloseAll, nil
}
func addCertRotation(stopCh <-chan struct{}, period time.Duration, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitAfter time.Duration, d *connrotation.Dialer) error {
func addCertRotation(logger klog.Logger, stopCh <-chan struct{}, period time.Duration, clientConfig *restclient.Config, clientCertificateManager certificate.Manager, exitAfter time.Duration, d *connrotation.Dialer) error {
tlsConfig, err := restclient.TLSConfigFor(clientConfig)
if err != nil {
return fmt.Errorf("unable to configure TLS for the rest client: %v", err)
@ -117,20 +117,20 @@ func addCertRotation(stopCh <-chan struct{}, period time.Duration, clientConfig
// the certificate has been deleted from disk or is otherwise corrupt
if now.After(lastCertAvailable.Add(exitAfter)) {
if clientCertificateManager.ServerHealthy() {
klog.ErrorS(nil, "No valid client certificate is found and the server is responsive, exiting.", "lastCertificateAvailabilityTime", lastCertAvailable, "shutdownThreshold", exitAfter)
logger.Error(nil, "No valid client certificate is found and the server is responsive, exiting.", "lastCertificateAvailabilityTime", lastCertAvailable, "shutdownThreshold", exitAfter)
os.Exit(1)
} else {
klog.ErrorS(nil, "No valid client certificate is found but the server is not responsive. A restart may be necessary to retrieve new initial credentials.", "lastCertificateAvailabilityTime", lastCertAvailable, "shutdownThreshold", exitAfter)
logger.Error(nil, "No valid client certificate is found but the server is not responsive. A restart may be necessary to retrieve new initial credentials.", "lastCertificateAvailabilityTime", lastCertAvailable, "shutdownThreshold", exitAfter)
}
}
} else {
// the certificate is expired
if now.After(curr.Leaf.NotAfter) {
if clientCertificateManager.ServerHealthy() {
klog.ErrorS(nil, "The currently active client certificate has expired and the server is responsive, exiting.")
logger.Error(nil, "The currently active client certificate has expired and the server is responsive, exiting.")
os.Exit(1)
} else {
klog.ErrorS(nil, "The currently active client certificate has expired, but the server is not responsive. A restart may be necessary to retrieve new initial credentials.")
logger.Error(nil, "The currently active client certificate has expired, but the server is not responsive. A restart may be necessary to retrieve new initial credentials.")
}
}
lastCertAvailable = now
@ -144,7 +144,7 @@ func addCertRotation(stopCh <-chan struct{}, period time.Duration, clientConfig
lastCert = curr
hasCert.Store(lastCert != nil)
klog.InfoS("Certificate rotation detected, shutting down client connections to start using new credentials")
logger.Info("Certificate rotation detected, shutting down client connections to start using new credentials")
// The cert has been rotated. Close all existing connections to force the client
// to reperform its TLS handshake with new cert.
//

View file

@ -17,7 +17,6 @@ limitations under the License.
package certificate
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
@ -33,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
"k8s.io/client-go/rest"
"k8s.io/kubernetes/test/utils/ktesting"
)
var (
@ -146,6 +146,8 @@ func TestRotateShutsDownConnections(t *testing.T) {
// This test fails if you comment out the t.closeAllConns() call in
// transport.go and don't close connections on a rotate.
logger, tCtx := ktesting.NewTestContext(t)
stop := make(chan struct{})
defer close(stop)
@ -191,7 +193,7 @@ func TestRotateShutsDownConnections(t *testing.T) {
}
// Check for a new cert every 10 milliseconds
if _, err := updateTransport(stop, 10*time.Millisecond, c, m, 0); err != nil {
if _, err := updateTransport(logger, stop, 10*time.Millisecond, c, m, 0); err != nil {
t.Fatal(err)
}
@ -200,7 +202,7 @@ func TestRotateShutsDownConnections(t *testing.T) {
t.Fatal(err)
}
if err := client.Get().Do(context.TODO()).Error(); err != nil {
if err := client.Get().Do(tCtx).Error(); err != nil {
t.Fatal(err)
}
firstCertSerial := lastSerialNumber()
@ -210,7 +212,7 @@ func TestRotateShutsDownConnections(t *testing.T) {
m.setCurrent(client2CertData.certificate)
err = wait.PollImmediate(time.Millisecond*50, wait.ForeverTestTimeout, func() (done bool, err error) {
client.Get().Do(context.TODO())
client.Get().Do(tCtx)
if firstCertSerial.Cmp(lastSerialNumber()) != 0 {
// The certificate changed!
return true, nil