diff --git a/discovery/aws/msk.go b/discovery/aws/msk.go index a68960066f..3ecc1e6235 100644 --- a/discovery/aws/msk.go +++ b/discovery/aws/msk.go @@ -34,6 +34,7 @@ import ( "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/common/promslog" + "golang.org/x/sync/errgroup" "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery/refresh" @@ -87,9 +88,10 @@ const ( // DefaultMSKSDConfig is the default MSK SD configuration. var DefaultMSKSDConfig = MSKSDConfig{ - Port: 80, - RefreshInterval: model.Duration(60 * time.Second), - HTTPClientConfig: config.DefaultHTTPClientConfig, + Port: 80, + RefreshInterval: model.Duration(60 * time.Second), + RequestConcurrency: 10, + HTTPClientConfig: config.DefaultHTTPClientConfig, } func init() { @@ -108,7 +110,8 @@ type MSKSDConfig struct { Port int `yaml:"port"` RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"` - HTTPClientConfig config.HTTPClientConfig `yaml:",inline"` + RequestConcurrency int `yaml:"request_concurrency,omitempty"` + HTTPClientConfig config.HTTPClientConfig `yaml:",inline"` } // NewDiscovererMetrics implements discovery.Config. @@ -247,39 +250,33 @@ func (d *MSKDiscovery) initMskClient(ctx context.Context) error { return nil } +// describeClusters describes the clusters with the given ARNs and returns their details. func (d *MSKDiscovery) describeClusters(ctx context.Context, clusterARNs []string) ([]types.Cluster, error) { var ( clusters []types.Cluster - wg sync.WaitGroup mu sync.Mutex - errs []error ) + errg, ectx := errgroup.WithContext(ctx) + errg.SetLimit(d.cfg.RequestConcurrency) for _, clusterARN := range clusterARNs { - wg.Add(1) - go func(clusterARN string) { - defer wg.Done() - cluster, err := d.msk.DescribeClusterV2(ctx, &kafka.DescribeClusterV2Input{ + errg.Go(func() error { + cluster, err := d.msk.DescribeClusterV2(ectx, &kafka.DescribeClusterV2Input{ ClusterArn: aws.String(clusterARN), }) if err != nil { - mu.Lock() - errs = append(errs, fmt.Errorf("could not describe cluster %v: %w", clusterARN, err)) - mu.Unlock() - return + return fmt.Errorf("could not describe cluster %v: %w", clusterARN, err) } mu.Lock() clusters = append(clusters, *cluster.ClusterInfo) mu.Unlock() - }(clusterARN) - } - wg.Wait() - if len(errs) > 0 { - return nil, fmt.Errorf("errors occurred while describing clusters: %v", errs) + return nil + }) } - return clusters, nil + return clusters, errg.Wait() } +// listClusters lists all MSK clusters in the configured region and returns their details. func (d *MSKDiscovery) listClusters(ctx context.Context) ([]types.Cluster, error) { var ( clusters []types.Cluster @@ -307,29 +304,42 @@ func (d *MSKDiscovery) listClusters(ctx context.Context) ([]types.Cluster, error return clusters, nil } -func (d *MSKDiscovery) listNodes(ctx context.Context, clusterARN string) ([]types.NodeInfo, error) { - var ( - nodes []types.NodeInfo - nextToken *string - ) - for { - resp, err := d.msk.ListNodes(ctx, &kafka.ListNodesInput{ - ClusterArn: aws.String(clusterARN), - MaxResults: aws.Int32(100), - NextToken: nextToken, - }) - if err != nil { - return nil, fmt.Errorf("could not list nodes for cluster %v: %w", clusterARN, err) - } +// listNodes lists all nodes for the given clusters and returns a map of cluster ARN to its nodes. +func (d *MSKDiscovery) listNodes(ctx context.Context, clusters []types.Cluster) (map[string][]types.NodeInfo, error) { + clusterNodeMap := make(map[string][]types.NodeInfo) + mu := sync.Mutex{} + errg, ectx := errgroup.WithContext(ctx) + errg.SetLimit(d.cfg.RequestConcurrency) + for _, cluster := range clusters { + clusterARN := aws.ToString(cluster.ClusterArn) + errg.Go(func() error { + var clusterNodes []types.NodeInfo + var nextToken *string + for { + resp, err := d.msk.ListNodes(ectx, &kafka.ListNodesInput{ + ClusterArn: aws.String(clusterARN), + MaxResults: aws.Int32(100), + NextToken: nextToken, + }) + if err != nil { + return fmt.Errorf("could not list nodes for cluster %v: %w", clusterARN, err) + } - nodes = append(nodes, resp.NodeInfoList...) - if resp.NextToken == nil { - break - } - nextToken = resp.NextToken + clusterNodes = append(clusterNodes, resp.NodeInfoList...) + if resp.NextToken == nil { + break + } + nextToken = resp.NextToken + } + + mu.Lock() + clusterNodeMap[clusterARN] = clusterNodes + mu.Unlock() + return nil + }) } - return nodes, nil + return clusterNodeMap, errg.Wait() } func (d *MSKDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { @@ -355,21 +365,20 @@ func (d *MSKDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error } } + clusterNodeMap, err := d.listNodes(ctx, clusters) + if err != nil { + return nil, err + } + var ( targetsMu sync.Mutex wg sync.WaitGroup ) for _, cluster := range clusters { wg.Add(1) - go func(cluster types.Cluster) { + + go func(cluster types.Cluster, nodes []types.NodeInfo) { defer wg.Done() - - nodes, err := d.listNodes(ctx, aws.ToString(cluster.ClusterArn)) - if err != nil { - d.logger.Error("Failed to list nodes", "cluster", aws.ToString(cluster.ClusterName), "error", err) - return - } - for _, node := range nodes { labels := model.LabelSet{ mskLabelClusterName: model.LabelValue(aws.ToString(cluster.ClusterName)), @@ -425,7 +434,7 @@ func (d *MSKDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error continue } } - }(cluster) + }(cluster, clusterNodeMap[aws.ToString(cluster.ClusterArn)]) } wg.Wait() diff --git a/discovery/aws/msk_test.go b/discovery/aws/msk_test.go index 31744221ef..b1d48a7ea6 100644 --- a/discovery/aws/msk_test.go +++ b/discovery/aws/msk_test.go @@ -218,7 +218,8 @@ func TestMSKDiscoveryDescribeClusters(t *testing.T) { d := &MSKDiscovery{ msk: client, cfg: &MSKSDConfig{ - Region: tt.mskData.region, + Region: tt.mskData.region, + RequestConcurrency: 10, }, } @@ -242,10 +243,10 @@ func TestMSKDiscoveryListNodes(t *testing.T) { ctx := context.Background() for _, tt := range []struct { - name string - mskData *mskDataStore - clusterARN string - expected []types.NodeInfo + name string + mskData *mskDataStore + clusters []types.Cluster + expected map[string][]types.NodeInfo }{ { name: "ClusterWithBrokers", @@ -280,30 +281,36 @@ func TestMSKDiscoveryListNodes(t *testing.T) { }, }, }, - clusterARN: "arn:aws:kafka:us-west-2:123456789012:cluster/test-cluster/abc-123", - expected: []types.NodeInfo{ + clusters: []types.Cluster{ { - NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-1"), - AddedToClusterTime: strptr("2023-01-01T00:00:00Z"), - InstanceType: strptr("kafka.m5.large"), - BrokerNodeInfo: &types.BrokerNodeInfo{ - BrokerId: aws.Float64(1), - ClientSubnet: strptr("subnet-12345"), - ClientVpcIpAddress: strptr("10.0.1.100"), - Endpoints: []string{"b-1.test-cluster.abc123.kafka.us-west-2.amazonaws.com"}, - AttachedENIId: strptr("eni-12345"), - }, + ClusterArn: strptr("arn:aws:kafka:us-west-2:123456789012:cluster/test-cluster/abc-123"), }, - { - NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-2"), - AddedToClusterTime: strptr("2023-01-01T00:00:00Z"), - InstanceType: strptr("kafka.m5.large"), - BrokerNodeInfo: &types.BrokerNodeInfo{ - BrokerId: aws.Float64(2), - ClientSubnet: strptr("subnet-67890"), - ClientVpcIpAddress: strptr("10.0.1.101"), - Endpoints: []string{"b-2.test-cluster.abc123.kafka.us-west-2.amazonaws.com"}, - AttachedENIId: strptr("eni-67890"), + }, + expected: map[string][]types.NodeInfo{ + "arn:aws:kafka:us-west-2:123456789012:cluster/test-cluster/abc-123": { + { + NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-1"), + AddedToClusterTime: strptr("2023-01-01T00:00:00Z"), + InstanceType: strptr("kafka.m5.large"), + BrokerNodeInfo: &types.BrokerNodeInfo{ + BrokerId: aws.Float64(1), + ClientSubnet: strptr("subnet-12345"), + ClientVpcIpAddress: strptr("10.0.1.100"), + Endpoints: []string{"b-1.test-cluster.abc123.kafka.us-west-2.amazonaws.com"}, + AttachedENIId: strptr("eni-12345"), + }, + }, + { + NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-2"), + AddedToClusterTime: strptr("2023-01-01T00:00:00Z"), + InstanceType: strptr("kafka.m5.large"), + BrokerNodeInfo: &types.BrokerNodeInfo{ + BrokerId: aws.Float64(2), + ClientSubnet: strptr("subnet-67890"), + ClientVpcIpAddress: strptr("10.0.1.101"), + Endpoints: []string{"b-2.test-cluster.abc123.kafka.us-west-2.amazonaws.com"}, + AttachedENIId: strptr("eni-67890"), + }, }, }, }, @@ -316,8 +323,68 @@ func TestMSKDiscoveryListNodes(t *testing.T) { "arn:aws:kafka:us-west-2:123456789012:cluster/empty-cluster/xyz-789": {}, }, }, - clusterARN: "arn:aws:kafka:us-west-2:123456789012:cluster/empty-cluster/xyz-789", - expected: nil, + clusters: []types.Cluster{ + { + ClusterArn: strptr("arn:aws:kafka:us-west-2:123456789012:cluster/empty-cluster/xyz-789"), + }, + }, + expected: map[string][]types.NodeInfo{ + "arn:aws:kafka:us-west-2:123456789012:cluster/empty-cluster/xyz-789": nil, + }, + }, + { + name: "MultipleClusters", + mskData: &mskDataStore{ + region: "us-west-2", + nodes: map[string][]types.NodeInfo{ + "arn:aws:kafka:us-west-2:123456789012:cluster/cluster-1/abc-123": { + { + NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-1"), + InstanceType: strptr("kafka.m5.large"), + BrokerNodeInfo: &types.BrokerNodeInfo{ + BrokerId: aws.Float64(1), + }, + }, + }, + "arn:aws:kafka:us-west-2:123456789012:cluster/cluster-2/def-456": { + { + NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-2"), + InstanceType: strptr("kafka.m5.xlarge"), + BrokerNodeInfo: &types.BrokerNodeInfo{ + BrokerId: aws.Float64(2), + }, + }, + }, + }, + }, + clusters: []types.Cluster{ + { + ClusterArn: strptr("arn:aws:kafka:us-west-2:123456789012:cluster/cluster-1/abc-123"), + }, + { + ClusterArn: strptr("arn:aws:kafka:us-west-2:123456789012:cluster/cluster-2/def-456"), + }, + }, + expected: map[string][]types.NodeInfo{ + "arn:aws:kafka:us-west-2:123456789012:cluster/cluster-1/abc-123": { + { + NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-1"), + InstanceType: strptr("kafka.m5.large"), + BrokerNodeInfo: &types.BrokerNodeInfo{ + BrokerId: aws.Float64(1), + }, + }, + }, + "arn:aws:kafka:us-west-2:123456789012:cluster/cluster-2/def-456": { + { + NodeARN: strptr("arn:aws:kafka:us-west-2:123456789012:node/broker-2"), + InstanceType: strptr("kafka.m5.xlarge"), + BrokerNodeInfo: &types.BrokerNodeInfo{ + BrokerId: aws.Float64(2), + }, + }, + }, + }, }, } { t.Run(tt.name, func(t *testing.T) { @@ -326,11 +393,12 @@ func TestMSKDiscoveryListNodes(t *testing.T) { d := &MSKDiscovery{ msk: client, cfg: &MSKSDConfig{ - Region: tt.mskData.region, + Region: tt.mskData.region, + RequestConcurrency: 10, }, } - nodes, err := d.listNodes(ctx, tt.clusterARN) + nodes, err := d.listNodes(ctx, tt.clusters) require.NoError(t, err) require.Equal(t, tt.expected, nodes) }) @@ -398,9 +466,10 @@ func TestMSKDiscoveryRefresh(t *testing.T) { }, }, config: &MSKSDConfig{ - Region: "us-west-2", - Port: 80, - Clusters: []string{"arn:aws:kafka:us-west-2:123456789012:cluster/test-cluster/abc-123"}, + Region: "us-west-2", + Port: 80, + RequestConcurrency: 10, + Clusters: []string{"arn:aws:kafka:us-west-2:123456789012:cluster/test-cluster/abc-123"}, }, expected: []*targetgroup.Group{ { @@ -441,9 +510,10 @@ func TestMSKDiscoveryRefresh(t *testing.T) { clusters: []types.Cluster{}, }, config: &MSKSDConfig{ - Region: "us-east-1", - Port: 80, - Clusters: []string{}, // Empty clusters list uses listClusters + Region: "us-east-1", + Port: 80, + RequestConcurrency: 10, + Clusters: []string{}, // Empty clusters list uses listClusters }, expected: []*targetgroup.Group{ { @@ -499,9 +569,10 @@ func TestMSKDiscoveryRefresh(t *testing.T) { }, }, config: &MSKSDConfig{ - Region: "us-west-2", - Port: 80, - Clusters: nil, // nil clusters list uses listClusters (backward compatibility) + Region: "us-west-2", + Port: 80, + RequestConcurrency: 10, + Clusters: nil, // nil clusters list uses listClusters (backward compatibility) }, expected: []*targetgroup.Group{ { @@ -612,9 +683,10 @@ func TestMSKDiscoveryRefresh(t *testing.T) { }, }, config: &MSKSDConfig{ - Region: "us-west-2", - Port: 80, - Clusters: []string{"arn:aws:kafka:us-west-2:123456789012:cluster/kraft-cluster/xyz-789"}, + Region: "us-west-2", + Port: 80, + RequestConcurrency: 10, + Clusters: []string{"arn:aws:kafka:us-west-2:123456789012:cluster/kraft-cluster/xyz-789"}, }, expected: []*targetgroup.Group{ { @@ -764,9 +836,10 @@ func TestMSKDiscoveryRefresh(t *testing.T) { }, }, config: &MSKSDConfig{ - Region: "us-east-1", - Port: 80, - Clusters: []string{"arn:aws:kafka:us-east-1:123456789012:cluster/multi-endpoint-cluster/abc-999"}, + Region: "us-east-1", + Port: 80, + RequestConcurrency: 10, + Clusters: []string{"arn:aws:kafka:us-east-1:123456789012:cluster/multi-endpoint-cluster/abc-999"}, }, expected: []*targetgroup.Group{ { @@ -922,8 +995,9 @@ func TestMSKDiscoveryRefresh(t *testing.T) { if config == nil { // Default config for backward compatibility config = &MSKSDConfig{ - Region: tt.mskData.region, - Port: 80, + Region: tt.mskData.region, + Port: 80, + RequestConcurrency: 10, } }