Merge pull request #18041 from matt-gp/aws-sd-msk-optimisations

AWS SD: Optimise MSK Role
This commit is contained in:
George Krajcsovits 2026-02-10 12:17:09 +01:00 committed by GitHub
commit 246341b169
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 180 additions and 97 deletions

View file

@ -34,6 +34,7 @@ import (
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"golang.org/x/sync/errgroup"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/refresh"
@ -87,9 +88,10 @@ const (
// DefaultMSKSDConfig is the default MSK SD configuration.
var DefaultMSKSDConfig = MSKSDConfig{
Port: 80,
RefreshInterval: model.Duration(60 * time.Second),
HTTPClientConfig: config.DefaultHTTPClientConfig,
Port: 80,
RefreshInterval: model.Duration(60 * time.Second),
RequestConcurrency: 10,
HTTPClientConfig: config.DefaultHTTPClientConfig,
}
func init() {
@ -108,7 +110,8 @@ type MSKSDConfig struct {
Port int `yaml:"port"`
RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
HTTPClientConfig config.HTTPClientConfig `yaml:",inline"`
RequestConcurrency int `yaml:"request_concurrency,omitempty"`
HTTPClientConfig config.HTTPClientConfig `yaml:",inline"`
}
// NewDiscovererMetrics implements discovery.Config.
@ -247,39 +250,33 @@ func (d *MSKDiscovery) initMskClient(ctx context.Context) error {
return nil
}
// describeClusters describes the clusters with the given ARNs and returns their details.
func (d *MSKDiscovery) describeClusters(ctx context.Context, clusterARNs []string) ([]types.Cluster, error) {
var (
clusters []types.Cluster
wg sync.WaitGroup
mu sync.Mutex
errs []error
)
errg, ectx := errgroup.WithContext(ctx)
errg.SetLimit(d.cfg.RequestConcurrency)
for _, clusterARN := range clusterARNs {
wg.Add(1)
go func(clusterARN string) {
defer wg.Done()
cluster, err := d.msk.DescribeClusterV2(ctx, &kafka.DescribeClusterV2Input{
errg.Go(func() error {
cluster, err := d.msk.DescribeClusterV2(ectx, &kafka.DescribeClusterV2Input{
ClusterArn: aws.String(clusterARN),
})
if err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("could not describe cluster %v: %w", clusterARN, err))
mu.Unlock()
return
return fmt.Errorf("could not describe cluster %v: %w", clusterARN, err)
}
mu.Lock()
clusters = append(clusters, *cluster.ClusterInfo)
mu.Unlock()
}(clusterARN)
}
wg.Wait()
if len(errs) > 0 {
return nil, fmt.Errorf("errors occurred while describing clusters: %v", errs)
return nil
})
}
return clusters, nil
return clusters, errg.Wait()
}
// listClusters lists all MSK clusters in the configured region and returns their details.
func (d *MSKDiscovery) listClusters(ctx context.Context) ([]types.Cluster, error) {
var (
clusters []types.Cluster
@ -307,29 +304,42 @@ func (d *MSKDiscovery) listClusters(ctx context.Context) ([]types.Cluster, error
return clusters, nil
}
func (d *MSKDiscovery) listNodes(ctx context.Context, clusterARN string) ([]types.NodeInfo, error) {
var (
nodes []types.NodeInfo
nextToken *string
)
for {
resp, err := d.msk.ListNodes(ctx, &kafka.ListNodesInput{
ClusterArn: aws.String(clusterARN),
MaxResults: aws.Int32(100),
NextToken: nextToken,
})
if err != nil {
return nil, fmt.Errorf("could not list nodes for cluster %v: %w", clusterARN, err)
}
// listNodes lists all nodes for the given clusters and returns a map of cluster ARN to its nodes.
func (d *MSKDiscovery) listNodes(ctx context.Context, clusters []types.Cluster) (map[string][]types.NodeInfo, error) {
clusterNodeMap := make(map[string][]types.NodeInfo)
mu := sync.Mutex{}
errg, ectx := errgroup.WithContext(ctx)
errg.SetLimit(d.cfg.RequestConcurrency)
for _, cluster := range clusters {
clusterARN := aws.ToString(cluster.ClusterArn)
errg.Go(func() error {
var clusterNodes []types.NodeInfo
var nextToken *string
for {
resp, err := d.msk.ListNodes(ectx, &kafka.ListNodesInput{
ClusterArn: aws.String(clusterARN),
MaxResults: aws.Int32(100),
NextToken: nextToken,
})
if err != nil {
return fmt.Errorf("could not list nodes for cluster %v: %w", clusterARN, err)
}
nodes = append(nodes, resp.NodeInfoList...)
if resp.NextToken == nil {
break
}
nextToken = resp.NextToken
clusterNodes = append(clusterNodes, resp.NodeInfoList...)
if resp.NextToken == nil {
break
}
nextToken = resp.NextToken
}
mu.Lock()
clusterNodeMap[clusterARN] = clusterNodes
mu.Unlock()
return nil
})
}
return nodes, nil
return clusterNodeMap, errg.Wait()
}
func (d *MSKDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
@ -355,21 +365,20 @@ func (d *MSKDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error
}
}
clusterNodeMap, err := d.listNodes(ctx, clusters)
if err != nil {
return nil, err
}
var (
targetsMu sync.Mutex
wg sync.WaitGroup
)
for _, cluster := range clusters {
wg.Add(1)
go func(cluster types.Cluster) {
go func(cluster types.Cluster, nodes []types.NodeInfo) {
defer wg.Done()
nodes, err := d.listNodes(ctx, aws.ToString(cluster.ClusterArn))
if err != nil {
d.logger.Error("Failed to list nodes", "cluster", aws.ToString(cluster.ClusterName), "error", err)
return
}
for _, node := range nodes {
labels := model.LabelSet{
mskLabelClusterName: model.LabelValue(aws.ToString(cluster.ClusterName)),
@ -425,7 +434,7 @@ func (d *MSKDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error
continue
}
}
}(cluster)
}(cluster, clusterNodeMap[aws.ToString(cluster.ClusterArn)])
}
wg.Wait()

View file

@ -218,7 +218,8 @@ func TestMSKDiscoveryDescribeClusters(t *testing.T) {
d := &MSKDiscovery{
msk: client,
cfg: &MSKSDConfig{
Region: tt.mskData.region,
Region: tt.mskData.region,
RequestConcurrency: 10,
},
}
@ -242,10 +243,10 @@ func TestMSKDiscoveryListNodes(t *testing.T) {
ctx := context.Background()
for _, tt := range []struct {
name string
mskData *mskDataStore
clusterARN string
expected []types.NodeInfo
name string
mskData *mskDataStore
clusters []types.Cluster
expected map[string][]types.NodeInfo
}{
{
name: "ClusterWithBrokers",
@ -280,30 +281,36 @@ func TestMSKDiscoveryListNodes(t *testing.T) {
},
},
},
clusterARN: "arn:aws:kafka:us-west-2:123456789012:cluster/test-cluster/abc-123",
expected: []types.NodeInfo{
clusters: []types.Cluster{
{
NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-1"),
AddedToClusterTime: strptr("2023-01-01T00:00:00Z"),
InstanceType: strptr("kafka.m5.large"),
BrokerNodeInfo: &types.BrokerNodeInfo{
BrokerId: aws.Float64(1),
ClientSubnet: strptr("subnet-12345"),
ClientVpcIpAddress: strptr("10.0.1.100"),
Endpoints: []string{"b-1.test-cluster.abc123.kafka.us-west-2.amazonaws.com"},
AttachedENIId: strptr("eni-12345"),
},
ClusterArn: strptr("arn:aws:kafka:us-west-2:123456789012:cluster/test-cluster/abc-123"),
},
{
NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-2"),
AddedToClusterTime: strptr("2023-01-01T00:00:00Z"),
InstanceType: strptr("kafka.m5.large"),
BrokerNodeInfo: &types.BrokerNodeInfo{
BrokerId: aws.Float64(2),
ClientSubnet: strptr("subnet-67890"),
ClientVpcIpAddress: strptr("10.0.1.101"),
Endpoints: []string{"b-2.test-cluster.abc123.kafka.us-west-2.amazonaws.com"},
AttachedENIId: strptr("eni-67890"),
},
expected: map[string][]types.NodeInfo{
"arn:aws:kafka:us-west-2:123456789012:cluster/test-cluster/abc-123": {
{
NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-1"),
AddedToClusterTime: strptr("2023-01-01T00:00:00Z"),
InstanceType: strptr("kafka.m5.large"),
BrokerNodeInfo: &types.BrokerNodeInfo{
BrokerId: aws.Float64(1),
ClientSubnet: strptr("subnet-12345"),
ClientVpcIpAddress: strptr("10.0.1.100"),
Endpoints: []string{"b-1.test-cluster.abc123.kafka.us-west-2.amazonaws.com"},
AttachedENIId: strptr("eni-12345"),
},
},
{
NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-2"),
AddedToClusterTime: strptr("2023-01-01T00:00:00Z"),
InstanceType: strptr("kafka.m5.large"),
BrokerNodeInfo: &types.BrokerNodeInfo{
BrokerId: aws.Float64(2),
ClientSubnet: strptr("subnet-67890"),
ClientVpcIpAddress: strptr("10.0.1.101"),
Endpoints: []string{"b-2.test-cluster.abc123.kafka.us-west-2.amazonaws.com"},
AttachedENIId: strptr("eni-67890"),
},
},
},
},
@ -316,8 +323,68 @@ func TestMSKDiscoveryListNodes(t *testing.T) {
"arn:aws:kafka:us-west-2:123456789012:cluster/empty-cluster/xyz-789": {},
},
},
clusterARN: "arn:aws:kafka:us-west-2:123456789012:cluster/empty-cluster/xyz-789",
expected: nil,
clusters: []types.Cluster{
{
ClusterArn: strptr("arn:aws:kafka:us-west-2:123456789012:cluster/empty-cluster/xyz-789"),
},
},
expected: map[string][]types.NodeInfo{
"arn:aws:kafka:us-west-2:123456789012:cluster/empty-cluster/xyz-789": nil,
},
},
{
name: "MultipleClusters",
mskData: &mskDataStore{
region: "us-west-2",
nodes: map[string][]types.NodeInfo{
"arn:aws:kafka:us-west-2:123456789012:cluster/cluster-1/abc-123": {
{
NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-1"),
InstanceType: strptr("kafka.m5.large"),
BrokerNodeInfo: &types.BrokerNodeInfo{
BrokerId: aws.Float64(1),
},
},
},
"arn:aws:kafka:us-west-2:123456789012:cluster/cluster-2/def-456": {
{
NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-2"),
InstanceType: strptr("kafka.m5.xlarge"),
BrokerNodeInfo: &types.BrokerNodeInfo{
BrokerId: aws.Float64(2),
},
},
},
},
},
clusters: []types.Cluster{
{
ClusterArn: strptr("arn:aws:kafka:us-west-2:123456789012:cluster/cluster-1/abc-123"),
},
{
ClusterArn: strptr("arn:aws:kafka:us-west-2:123456789012:cluster/cluster-2/def-456"),
},
},
expected: map[string][]types.NodeInfo{
"arn:aws:kafka:us-west-2:123456789012:cluster/cluster-1/abc-123": {
{
NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-1"),
InstanceType: strptr("kafka.m5.large"),
BrokerNodeInfo: &types.BrokerNodeInfo{
BrokerId: aws.Float64(1),
},
},
},
"arn:aws:kafka:us-west-2:123456789012:cluster/cluster-2/def-456": {
{
NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-2"),
InstanceType: strptr("kafka.m5.xlarge"),
BrokerNodeInfo: &types.BrokerNodeInfo{
BrokerId: aws.Float64(2),
},
},
},
},
},
} {
t.Run(tt.name, func(t *testing.T) {
@ -326,11 +393,12 @@ func TestMSKDiscoveryListNodes(t *testing.T) {
d := &MSKDiscovery{
msk: client,
cfg: &MSKSDConfig{
Region: tt.mskData.region,
Region: tt.mskData.region,
RequestConcurrency: 10,
},
}
nodes, err := d.listNodes(ctx, tt.clusterARN)
nodes, err := d.listNodes(ctx, tt.clusters)
require.NoError(t, err)
require.Equal(t, tt.expected, nodes)
})
@ -398,9 +466,10 @@ func TestMSKDiscoveryRefresh(t *testing.T) {
},
},
config: &MSKSDConfig{
Region: "us-west-2",
Port: 80,
Clusters: []string{"arn:aws:kafka:us-west-2:123456789012:cluster/test-cluster/abc-123"},
Region: "us-west-2",
Port: 80,
RequestConcurrency: 10,
Clusters: []string{"arn:aws:kafka:us-west-2:123456789012:cluster/test-cluster/abc-123"},
},
expected: []*targetgroup.Group{
{
@ -441,9 +510,10 @@ func TestMSKDiscoveryRefresh(t *testing.T) {
clusters: []types.Cluster{},
},
config: &MSKSDConfig{
Region: "us-east-1",
Port: 80,
Clusters: []string{}, // Empty clusters list uses listClusters
Region: "us-east-1",
Port: 80,
RequestConcurrency: 10,
Clusters: []string{}, // Empty clusters list uses listClusters
},
expected: []*targetgroup.Group{
{
@ -499,9 +569,10 @@ func TestMSKDiscoveryRefresh(t *testing.T) {
},
},
config: &MSKSDConfig{
Region: "us-west-2",
Port: 80,
Clusters: nil, // nil clusters list uses listClusters (backward compatibility)
Region: "us-west-2",
Port: 80,
RequestConcurrency: 10,
Clusters: nil, // nil clusters list uses listClusters (backward compatibility)
},
expected: []*targetgroup.Group{
{
@ -612,9 +683,10 @@ func TestMSKDiscoveryRefresh(t *testing.T) {
},
},
config: &MSKSDConfig{
Region: "us-west-2",
Port: 80,
Clusters: []string{"arn:aws:kafka:us-west-2:123456789012:cluster/kraft-cluster/xyz-789"},
Region: "us-west-2",
Port: 80,
RequestConcurrency: 10,
Clusters: []string{"arn:aws:kafka:us-west-2:123456789012:cluster/kraft-cluster/xyz-789"},
},
expected: []*targetgroup.Group{
{
@ -764,9 +836,10 @@ func TestMSKDiscoveryRefresh(t *testing.T) {
},
},
config: &MSKSDConfig{
Region: "us-east-1",
Port: 80,
Clusters: []string{"arn:aws:kafka:us-east-1:123456789012:cluster/multi-endpoint-cluster/abc-999"},
Region: "us-east-1",
Port: 80,
RequestConcurrency: 10,
Clusters: []string{"arn:aws:kafka:us-east-1:123456789012:cluster/multi-endpoint-cluster/abc-999"},
},
expected: []*targetgroup.Group{
{
@ -922,8 +995,9 @@ func TestMSKDiscoveryRefresh(t *testing.T) {
if config == nil {
// Default config for backward compatibility
config = &MSKSDConfig{
Region: tt.mskData.region,
Port: 80,
Region: tt.mskData.region,
Port: 80,
RequestConcurrency: 10,
}
}