// Copyright The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package aws import ( "context" "errors" "fmt" "log/slog" "maps" "net" "strconv" "strings" "sync" "time" "github.com/aws/aws-sdk-go-v2/aws" awsConfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/credentials/stscreds" "github.com/aws/aws-sdk-go-v2/service/elasticache" "github.com/aws/aws-sdk-go-v2/service/elasticache/types" "github.com/aws/aws-sdk-go-v2/service/sts" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/common/promslog" "golang.org/x/sync/errgroup" "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery/refresh" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/util/strutil" ) const ( elasticacheLabel = model.MetaLabelPrefix + "elasticache_" elasticacheLabelDeploymentOption = elasticacheLabel + "deployment_option" // cache cluster. elasticacheLabelCacheCluster = elasticacheLabel + "cache_cluster_" elasticacheLabelCacheClusterARN = elasticacheLabelCacheCluster + "arn" elasticacheLabelCacheClusterAtRestEncryptionEnabled = elasticacheLabelCacheCluster + "at_rest_encryption_enabled" elasticacheLabelCacheClusterAuthTokenEnabled = elasticacheLabelCacheCluster + "auth_token_enabled" elasticacheLabelCacheClusterAuthTokenLastModified = elasticacheLabelCacheCluster + "auth_token_last_modified" elasticacheLabelCacheClusterAutoMinorVersionUpgrade = elasticacheLabelCacheCluster + "auto_minor_version_upgrade" elasticacheLabelCacheClusterCreateTime = elasticacheLabelCacheCluster + "cache_cluster_create_time" elasticacheLabelCacheClusterID = elasticacheLabelCacheCluster + "cache_cluster_id" elasticacheLabelCacheClusterStatus = elasticacheLabelCacheCluster + "cache_cluster_status" elasticacheLabelCacheClusterNodeType = elasticacheLabelCacheCluster + "cache_node_type" elasticacheLabelCacheClusterParameterGroup = elasticacheLabelCacheCluster + "cache_parameter_group" elasticacheLabelCacheClusterSubnetGroupName = elasticacheLabelCacheCluster + "cache_subnet_group_name" elasticacheLabelCacheClusterClientDownloadLandingPage = elasticacheLabelCacheCluster + "client_download_landing_page" elasticacheLabelCacheClusterEngine = elasticacheLabelCacheCluster + "engine" elasticacheLabelCacheClusterEngineVersion = elasticacheLabelCacheCluster + "engine_version" elasticacheLabelCacheClusterIPDiscovery = elasticacheLabelCacheCluster + "ip_discovery" elasticacheLabelCacheClusterNetworkType = elasticacheLabelCacheCluster + "network_type" elasticacheLabelCacheClusterNumCacheNodes = elasticacheLabelCacheCluster + "num_cache_nodes" elasticacheLabelCacheClusterPreferredAvailabilityZone = elasticacheLabelCacheCluster + "preferred_availability_zone" elasticacheLabelCacheClusterPreferredMaintenanceWindow = elasticacheLabelCacheCluster + "preferred_maintenance_window" elasticacheLabelCacheClusterPreferredOutpostARN = elasticacheLabelCacheCluster + "preferred_outpost_arn" elasticacheLabelCacheClusterReplicationGroupID = elasticacheLabelCacheCluster + "replication_group_id" elasticacheLabelCacheClusterReplicationGroupLogDeliveryEnabled = elasticacheLabelCacheCluster + "replication_group_log_delivery_enabled" elasticacheLabelCacheClusterSnapshotRetentionLimit = elasticacheLabelCacheCluster + "snapshot_retention_limit" elasticacheLabelCacheClusterSnapshotWindow = elasticacheLabelCacheCluster + "snapshot_window" elasticacheLabelCacheClusterTransitEncryptionEnabled = elasticacheLabelCacheCluster + "transit_encryption_enabled" elasticacheLabelCacheClusterTransitEncryptionMode = elasticacheLabelCacheCluster + "transit_encryption_mode" // configuration endpoint. elasticacheLabelCacheClusterConfigurationEndpoint = elasticacheLabelCacheCluster + "configuration_endpoint_" elasticacheLabelCacheClusterConfigurationEndpointAddress = elasticacheLabelCacheClusterConfigurationEndpoint + "address" elasticacheLabelCacheClusterConfigurationEndpointPort = elasticacheLabelCacheClusterConfigurationEndpoint + "port" // notification. elasticacheLabelCacheClusterNotification = elasticacheLabelCacheCluster + "notification_" elasticacheLabelCacheClusterNotificationTopicARN = elasticacheLabelCacheClusterNotification + "topic_arn" elasticacheLabelCacheClusterNotificationTopicStatus = elasticacheLabelCacheClusterNotification + "topic_status" // log delivery configuration (slice - use with index). elasticacheLabelCacheClusterLogDeliveryConfiguration = elasticacheLabelCacheCluster + "log_delivery_configuration_" elasticacheLabelCacheClusterLogDeliveryConfigurationDestinationType = elasticacheLabelCacheClusterLogDeliveryConfiguration + "destination_type" elasticacheLabelCacheClusterLogDeliveryConfigurationLogFormat = elasticacheLabelCacheClusterLogDeliveryConfiguration + "log_format" elasticacheLabelCacheClusterLogDeliveryConfigurationLogType = elasticacheLabelCacheClusterLogDeliveryConfiguration + "log_type" elasticacheLabelCacheClusterLogDeliveryConfigurationStatus = elasticacheLabelCacheClusterLogDeliveryConfiguration + "status" elasticacheLabelCacheClusterLogDeliveryConfigurationMessage = elasticacheLabelCacheClusterLogDeliveryConfiguration + "message" elasticacheLabelCacheClusterLogDeliveryConfigurationLogGroup = elasticacheLabelCacheClusterLogDeliveryConfiguration + "log_group" elasticacheLabelCacheClusterLogDeliveryConfigurationDeliveryStream = elasticacheLabelCacheClusterLogDeliveryConfiguration + "delivery_stream" // pending modified values. elasticacheLabelCacheClusterPendingModifiedValues = elasticacheLabelCacheCluster + "pending_modified_values_" elasticacheLabelCacheClusterPendingModifiedValuesAuthTokenStatus = elasticacheLabelCacheClusterPendingModifiedValues + "auth_token_status" elasticacheLabelCacheClusterPendingModifiedValuesCacheNodeType = elasticacheLabelCacheClusterPendingModifiedValues + "cache_node_type" elasticacheLabelCacheClusterPendingModifiedValuesEngineVersion = elasticacheLabelCacheClusterPendingModifiedValues + "engine_version" elasticacheLabelCacheClusterPendingModifiedValuesNumCacheNodes = elasticacheLabelCacheClusterPendingModifiedValues + "num_cache_nodes" elasticacheLabelCacheClusterPendingModifiedValuesTransitEncryptionEnabled = elasticacheLabelCacheClusterPendingModifiedValues + "transit_encryption_enabled" elasticacheLabelCacheClusterPendingModifiedValuesTransitEncryptionMode = elasticacheLabelCacheClusterPendingModifiedValues + "transit_encryption_mode" elasticacheLabelCacheClusterPendingModifiedValuesCacheNodeIDsToRemove = elasticacheLabelCacheClusterPendingModifiedValues + "cache_node_ids_to_remove" // security group membership (slice - use with index). elasticacheLabelCacheClusterSecurityGroupMembership = elasticacheLabelCacheCluster + "security_group_membership_" elasticacheLabelCacheClusterSecurityGroupMembershipID = elasticacheLabelCacheClusterSecurityGroupMembership + "id" elasticacheLabelCacheClusterSecurityGroupMembershipStatus = elasticacheLabelCacheClusterSecurityGroupMembership + "status" // tags - create one label per tag key, with the format: elasticache_cache_cluster_tag_. elasticacheLabelCacheClusterTag = elasticacheLabelCacheCluster + "tag_" // node. elasticacheLabelCacheClusterNode = elasticacheLabelCacheCluster + "node_" elasticacheLabelCacheClusterNodeCreateTime = elasticacheLabelCacheClusterNode + "create_time" elasticacheLabelCacheClusterNodeID = elasticacheLabelCacheClusterNode + "id" elasticacheLabelCacheClusterNodeStatus = elasticacheLabelCacheClusterNode + "status" elasticacheLabelCacheClusterNodeAZ = elasticacheLabelCacheClusterNode + "availability_zone" elasticacheLabelCacheClusterNodeCustomerOutpostARN = elasticacheLabelCacheClusterNode + "customer_outpost_arn" elasticacheLabelCacheClusterNodeSourceCacheNodeID = elasticacheLabelCacheClusterNode + "source_cache_node_id" elasticacheLabelCacheClusterNodeParameterGroupStatus = elasticacheLabelCacheClusterNode + "parameter_group_status" // endpoint. elasticacheLabelCacheClusterNodeEndpoint = elasticacheLabelCacheClusterNode + "endpoint_" elasticacheLabelCacheClusterNodeEndpointAddress = elasticacheLabelCacheClusterNodeEndpoint + "address" elasticacheLabelCacheClusterNodeEndpointPort = elasticacheLabelCacheClusterNodeEndpoint + "port" // serverless cache. elasticacheLabelServerlessCache = elasticacheLabel + "serverless_cache_" elasticacheLabelServerlessCacheARN = elasticacheLabelServerlessCache + "arn" elasticacheLabelServerlessCacheName = elasticacheLabelServerlessCache + "name" elasticacheLabelServerlessCacheCreateTime = elasticacheLabelServerlessCache + "create_time" elasticacheLabelServerlessCacheDescription = elasticacheLabelServerlessCache + "description" elasticacheLabelServerlessCacheEngine = elasticacheLabelServerlessCache + "engine" elasticacheLabelServerlessCacheFullEngineVersion = elasticacheLabelServerlessCache + "full_engine_version" elasticacheLabelServerlessCacheMajorEngineVersion = elasticacheLabelServerlessCache + "major_engine_version" elasticacheLabelServerlessCacheStatus = elasticacheLabelServerlessCache + "status" elasticacheLabelServerlessCacheKmsKeyID = elasticacheLabelServerlessCache + "kms_key_id" elasticacheLabelServerlessCacheUserGroupID = elasticacheLabelServerlessCache + "user_group_id" elasticacheLabelServerlessCacheDailySnapshotTime = elasticacheLabelServerlessCache + "daily_snapshot_time" elasticacheLabelServerlessCacheSnapshotRetentionLimit = elasticacheLabelServerlessCache + "snapshot_retention_limit" // endpoint. elasticacheLabelServerlessCacheEndpoint = elasticacheLabelServerlessCache + "endpoint_" elasticacheLabelServerlessCacheEndpointAddress = elasticacheLabelServerlessCacheEndpoint + "address" elasticacheLabelServerlessCacheEndpointPort = elasticacheLabelServerlessCacheEndpoint + "port" elasticacheLabelServerlessCacheReaderEndpointAddress = elasticacheLabelServerlessCacheEndpoint + "reader_address" elasticacheLabelServerlessCacheReaderEndpointPort = elasticacheLabelServerlessCacheEndpoint + "reader_port" // security group membership (slice - use with index). elasticacheLabelServerlessCacheSecurityGroupID = elasticacheLabelServerlessCache + "security_group_id" // Subnet group membership (slice - use with index). elasticacheLabelServerlessCacheSubnetID = elasticacheLabelServerlessCache + "subnet_id" // cache usage limits. elasticacheLabelServerlessCacheCacheUsageLimit = elasticacheLabelServerlessCache + "cache_usage_limit_" elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorage = elasticacheLabelServerlessCacheCacheUsageLimit + "data_storage" elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageMaximum = elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorage + "maximum" elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageMinimum = elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorage + "minimum" elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageUnit = elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorage + "unit" elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecond = elasticacheLabelServerlessCacheCacheUsageLimit + "ecpu_per_second" elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecondMaximum = elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecond + "maximum" elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecondMinimum = elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecond + "minimum" // tags - create one label per tag key, with the format: elasticache_serverless_cache_tag_. elasticacheLabelServerlessCacheTag = elasticacheLabelServerlessCache + "tag_" ) // DefaultElasticacheSDConfig is the default Elasticache SD configuration. var DefaultElasticacheSDConfig = ElasticacheSDConfig{ Port: 80, RefreshInterval: model.Duration(60 * time.Second), RequestConcurrency: 10, HTTPClientConfig: config.DefaultHTTPClientConfig, } func init() { discovery.RegisterConfig(&ElasticacheSDConfig{}) } // ElasticacheSDConfig is the configuration for Elasticache based service discovery. type ElasticacheSDConfig struct { Region string `yaml:"region"` Endpoint string `yaml:"endpoint"` AccessKey string `yaml:"access_key,omitempty"` SecretKey config.Secret `yaml:"secret_key,omitempty"` Profile string `yaml:"profile,omitempty"` RoleARN string `yaml:"role_arn,omitempty"` Clusters []string `yaml:"clusters,omitempty"` Port int `yaml:"port"` RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"` // RequestConcurrency controls the maximum number of concurrent Elasticache API requests. RequestConcurrency int `yaml:"request_concurrency,omitempty"` HTTPClientConfig config.HTTPClientConfig `yaml:",inline"` } // NewDiscovererMetrics implements discovery.Config. func (*ElasticacheSDConfig) NewDiscovererMetrics(_ prometheus.Registerer, rmi discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics { return &elasticacheMetrics{ refreshMetrics: rmi, } } // Name returns the name of the Elasticache Config. func (*ElasticacheSDConfig) Name() string { return "elasticache" } // NewDiscoverer returns a Discoverer for the Elasticache Config. func (c *ElasticacheSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { return NewElasticacheDiscovery(c, opts) } // UnmarshalYAML implements the yaml.Unmarshaler interface for the Elasticache Config. func (c *ElasticacheSDConfig) UnmarshalYAML(unmarshal func(any) error) error { *c = DefaultElasticacheSDConfig type plain ElasticacheSDConfig err := unmarshal((*plain)(c)) if err != nil { return err } c.Region, err = loadRegion(context.Background(), c.Region) if err != nil { return fmt.Errorf("could not determine AWS region: %w", err) } return c.HTTPClientConfig.Validate() } type elasticacheClient interface { DescribeServerlessCaches(ctx context.Context, params *elasticache.DescribeServerlessCachesInput, optFns ...func(*elasticache.Options)) (*elasticache.DescribeServerlessCachesOutput, error) DescribeCacheClusters(ctx context.Context, params *elasticache.DescribeCacheClustersInput, optFns ...func(*elasticache.Options)) (*elasticache.DescribeCacheClustersOutput, error) ListTagsForResource(ctx context.Context, params *elasticache.ListTagsForResourceInput, optFns ...func(*elasticache.Options)) (*elasticache.ListTagsForResourceOutput, error) } // ElasticacheDiscovery periodically performs Elasticache-SD requests. // It implements the Discoverer interface. type ElasticacheDiscovery struct { *refresh.Discovery logger *slog.Logger cfg *ElasticacheSDConfig elasticacheClient elasticacheClient } // NewElasticacheDiscovery returns a new ElasticacheDiscovery which periodically refreshes its targets. func NewElasticacheDiscovery(conf *ElasticacheSDConfig, opts discovery.DiscovererOptions) (*ElasticacheDiscovery, error) { m, ok := opts.Metrics.(*elasticacheMetrics) if !ok { return nil, errors.New("invalid discovery metrics type") } if opts.Logger == nil { opts.Logger = promslog.NewNopLogger() } d := &ElasticacheDiscovery{ logger: opts.Logger, cfg: conf, } d.Discovery = refresh.NewDiscovery( refresh.Options{ Logger: opts.Logger, Mech: "elasticache", Interval: time.Duration(d.cfg.RefreshInterval), RefreshF: d.refresh, MetricsInstantiator: m.refreshMetrics, }, ) return d, nil } func (d *ElasticacheDiscovery) initElasticacheClient(ctx context.Context) error { if d.elasticacheClient != nil { return nil } if d.cfg.Region == "" { return errors.New("region must be set for Elasticache service discovery") } // Build the HTTP client from the provided HTTPClientConfig. client, err := config.NewClientFromConfig(d.cfg.HTTPClientConfig, "elasticache_sd") if err != nil { return err } // Build the AWS config with the provided region. var configOptions []func(*awsConfig.LoadOptions) error configOptions = append(configOptions, awsConfig.WithRegion(d.cfg.Region)) configOptions = append(configOptions, awsConfig.WithHTTPClient(client)) // Only set static credentials if both access key and secret key are provided // Otherwise, let AWS SDK use its default credential chain if d.cfg.AccessKey != "" && d.cfg.SecretKey != "" { credProvider := credentials.NewStaticCredentialsProvider(d.cfg.AccessKey, string(d.cfg.SecretKey), "") configOptions = append(configOptions, awsConfig.WithCredentialsProvider(credProvider)) } if d.cfg.Profile != "" { configOptions = append(configOptions, awsConfig.WithSharedConfigProfile(d.cfg.Profile)) } cfg, err := awsConfig.LoadDefaultConfig(ctx, configOptions...) if err != nil { d.logger.Error("Failed to create AWS config", "error", err) return fmt.Errorf("could not create aws config: %w", err) } // If the role ARN is set, assume the role to get credentials and set the credentials provider in the config. if d.cfg.RoleARN != "" { assumeProvider := stscreds.NewAssumeRoleProvider(sts.NewFromConfig(cfg), d.cfg.RoleARN) cfg.Credentials = aws.NewCredentialsCache(assumeProvider) } d.elasticacheClient = elasticache.NewFromConfig(cfg, func(options *elasticache.Options) { if d.cfg.Endpoint != "" { options.BaseEndpoint = &d.cfg.Endpoint } options.HTTPClient = client }) // Test credentials by making a simple API call testCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() _, err = d.elasticacheClient.DescribeCacheClusters(testCtx, &elasticache.DescribeCacheClustersInput{}) if err != nil { d.logger.Error("Failed to test Elasticache credentials", "error", err) return fmt.Errorf("elasticache credential test failed: %w", err) } return nil } // describeServerlessCaches calls DescribeServerlessCaches API for the given cache IDs (or all caches if no IDs are provided) and returns the list of serverless caches. func (d *ElasticacheDiscovery) describeServerlessCaches(ctx context.Context, caches []string) ([]types.ServerlessCache, error) { mu := &sync.Mutex{} errg, ectx := errgroup.WithContext(ctx) errg.SetLimit(d.cfg.RequestConcurrency) var serverlessCaches []types.ServerlessCache if len(caches) == 0 { errg.Go(func() error { var nextToken *string for { output, err := d.elasticacheClient.DescribeServerlessCaches(ectx, &elasticache.DescribeServerlessCachesInput{ MaxResults: aws.Int32(50), NextToken: nextToken, }) if err != nil { return fmt.Errorf("failed to describe serverless caches: %w", err) } mu.Lock() serverlessCaches = append(serverlessCaches, output.ServerlessCaches...) mu.Unlock() if output.NextToken == nil { break } nextToken = output.NextToken } return nil }) } else { for _, cacheID := range caches { errg.Go(func() error { output, err := d.elasticacheClient.DescribeServerlessCaches(ectx, &elasticache.DescribeServerlessCachesInput{ MaxResults: aws.Int32(50), NextToken: nil, ServerlessCacheName: aws.String(cacheID), }) if err != nil { return fmt.Errorf("failed to describe serverless cache %s: %w", cacheID, err) } mu.Lock() serverlessCaches = append(serverlessCaches, output.ServerlessCaches...) mu.Unlock() return nil }) } } return serverlessCaches, errg.Wait() } // describeCacheClusters calls DescribeCacheClusters API for the given cache cluster IDs (or all cache clusters if no IDs are provided) and returns the list of cache clusters. func (d *ElasticacheDiscovery) describeCacheClusters(ctx context.Context, caches []string) ([]types.CacheCluster, error) { mu := &sync.Mutex{} errg, ectx := errgroup.WithContext(ctx) errg.SetLimit(d.cfg.RequestConcurrency) showCacheClustersNotInReplicationGroupsBools := []bool{false, true} var cacheClusters []types.CacheCluster if len(caches) == 0 { for _, showCacheClustersNotInReplicationGroupsBool := range showCacheClustersNotInReplicationGroupsBools { errg.Go(func() error { var nextToken *string for { output, err := d.elasticacheClient.DescribeCacheClusters(ectx, &elasticache.DescribeCacheClustersInput{ MaxRecords: aws.Int32(100), Marker: nextToken, ShowCacheNodeInfo: aws.Bool(true), ShowCacheClustersNotInReplicationGroups: aws.Bool(showCacheClustersNotInReplicationGroupsBool), }) if err != nil { return fmt.Errorf("failed to describe cache clusters: %w", err) } mu.Lock() cacheClusters = append(cacheClusters, output.CacheClusters...) mu.Unlock() if output.Marker == nil { break } nextToken = output.Marker } return nil }) } } else { for _, cacheID := range caches { for _, showCacheClustersNotInReplicationGroupsBool := range showCacheClustersNotInReplicationGroupsBools { errg.Go(func() error { output, err := d.elasticacheClient.DescribeCacheClusters(ectx, &elasticache.DescribeCacheClustersInput{ MaxRecords: aws.Int32(100), Marker: nil, ShowCacheNodeInfo: aws.Bool(true), ShowCacheClustersNotInReplicationGroups: aws.Bool(showCacheClustersNotInReplicationGroupsBool), CacheClusterId: aws.String(cacheID), }) if err != nil { return fmt.Errorf("failed to describe cache cluster %s: %w", cacheID, err) } mu.Lock() cacheClusters = append(cacheClusters, output.CacheClusters...) mu.Unlock() return nil }) } } } return cacheClusters, errg.Wait() } // listTagsForResource calls ListTagsForResource API for the given resource ARNs and returns a map of resource ARN to list of tags. func (d *ElasticacheDiscovery) listTagsForResource(ctx context.Context, resourceARNs []string) (map[string][]types.Tag, error) { mu := &sync.Mutex{} errg, ectx := errgroup.WithContext(ctx) errg.SetLimit(d.cfg.RequestConcurrency) tagsByResourceARN := make(map[string][]types.Tag) for _, resourceARN := range resourceARNs { errg.Go(func() error { output, err := d.elasticacheClient.ListTagsForResource(ectx, &elasticache.ListTagsForResourceInput{ ResourceName: aws.String(resourceARN), }) if err != nil { return fmt.Errorf("failed to list tags for resource %s: %w", resourceARN, err) } mu.Lock() tagsByResourceARN[resourceARN] = output.TagList mu.Unlock() return nil }) } return tagsByResourceARN, errg.Wait() } func (d *ElasticacheDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { err := d.initElasticacheClient(ctx) if err != nil { return nil, err } var clusters []string clustersMu := sync.Mutex{} serverlessCacheIDs, cacheClusterIDs := splitCacheDeploymentOptions(d.cfg.Clusters) clusterErrg, clusterCtx := errgroup.WithContext(ctx) clusterErrg.Go(func() error { caches, err := d.describeServerlessCaches(clusterCtx, serverlessCacheIDs) if err != nil { return fmt.Errorf("failed to describe serverless caches: %w", err) } for _, cache := range caches { clustersMu.Lock() clusters = append(clusters, *cache.ARN) clustersMu.Unlock() } return nil }) clusterErrg.Go(func() error { cacheClusters, err := d.describeCacheClusters(clusterCtx, cacheClusterIDs) if err != nil { return fmt.Errorf("failed to describe cache clusters: %w", err) } for _, cluster := range cacheClusters { clustersMu.Lock() clusters = append(clusters, *cluster.ARN) clustersMu.Unlock() } return nil }) if err := clusterErrg.Wait(); err != nil { return nil, err } tagsByResourceARN, err := d.listTagsForResource(ctx, clusters) if err != nil { return nil, fmt.Errorf("failed to list tags for resources: %w", err) } tg := &targetgroup.Group{ Source: d.cfg.Region, } errg, ectx := errgroup.WithContext(ctx) errg.Go(func() error { caches, err := d.describeServerlessCaches(ectx, serverlessCacheIDs) if err != nil { return fmt.Errorf("failed to describe serverless caches: %w", err) } for _, cache := range caches { addServerlessCacheTargets(tg, &cache, tagsByResourceARN[*cache.ARN]) } return nil }) errg.Go(func() error { cacheClusters, err := d.describeCacheClusters(ectx, cacheClusterIDs) if err != nil { return fmt.Errorf("failed to describe cache clusters: %w", err) } for _, cluster := range cacheClusters { addCacheClusterTargets(tg, &cluster, tagsByResourceARN[*cluster.ARN]) } return nil }) if err := errg.Wait(); err != nil { return nil, err } return []*targetgroup.Group{tg}, nil } // splitCacheTypes takes a list of cache ARNs and splits them into serverless cache IDs and cache cluster IDs based on their format. // Serverless caches are in the format arn:aws:elasticache:::serverlesscache: // Cache clusters are in the format arn:aws:elasticache:::replicationgroup:. func splitCacheDeploymentOptions(caches []string) (serverlessCacheIDs, cacheClusterIDs []string) { for _, cacheARN := range caches { if len(cacheARN) == 0 { continue } parts := strings.Split(cacheARN, ":") if len(parts) < 6 { continue } resourceType := parts[5] resourceID := parts[6] switch resourceType { case "serverlesscache": serverlessCacheIDs = append(serverlessCacheIDs, resourceID) case "replicationgroup": cacheClusterIDs = append(cacheClusterIDs, resourceID) default: continue } } return serverlessCacheIDs, cacheClusterIDs } // addServerlessCacheTargets adds targets for a serverless cache to the target group. func addServerlessCacheTargets(tg *targetgroup.Group, cache *types.ServerlessCache, tags []types.Tag) { labels := model.LabelSet{ elasticacheLabelDeploymentOption: model.LabelValue("serverless"), elasticacheLabelServerlessCacheARN: model.LabelValue(*cache.ARN), elasticacheLabelServerlessCacheName: model.LabelValue(*cache.ServerlessCacheName), elasticacheLabelServerlessCacheStatus: model.LabelValue(*cache.Status), elasticacheLabelServerlessCacheEngine: model.LabelValue(*cache.Engine), elasticacheLabelServerlessCacheFullEngineVersion: model.LabelValue(*cache.FullEngineVersion), elasticacheLabelServerlessCacheMajorEngineVersion: model.LabelValue(*cache.MajorEngineVersion), } if cache.Description != nil { labels[elasticacheLabelServerlessCacheDescription] = model.LabelValue(*cache.Description) } if cache.CreateTime != nil { labels[elasticacheLabelServerlessCacheCreateTime] = model.LabelValue(cache.CreateTime.Format(time.RFC3339)) } if cache.KmsKeyId != nil { labels[elasticacheLabelServerlessCacheKmsKeyID] = model.LabelValue(*cache.KmsKeyId) } if cache.UserGroupId != nil { labels[elasticacheLabelServerlessCacheUserGroupID] = model.LabelValue(*cache.UserGroupId) } if cache.DailySnapshotTime != nil { labels[elasticacheLabelServerlessCacheDailySnapshotTime] = model.LabelValue(*cache.DailySnapshotTime) } if cache.SnapshotRetentionLimit != nil { labels[elasticacheLabelServerlessCacheSnapshotRetentionLimit] = model.LabelValue(strconv.Itoa(int(*cache.SnapshotRetentionLimit))) } if cache.Endpoint != nil { if cache.Endpoint.Address != nil { labels[elasticacheLabelServerlessCacheEndpointAddress] = model.LabelValue(*cache.Endpoint.Address) } if cache.Endpoint.Port != nil { labels[elasticacheLabelServerlessCacheEndpointPort] = model.LabelValue(strconv.Itoa(int(*cache.Endpoint.Port))) } } if cache.ReaderEndpoint != nil { if cache.ReaderEndpoint.Address != nil { labels[elasticacheLabelServerlessCacheReaderEndpointAddress] = model.LabelValue(*cache.ReaderEndpoint.Address) } if cache.ReaderEndpoint.Port != nil { labels[elasticacheLabelServerlessCacheReaderEndpointPort] = model.LabelValue(strconv.Itoa(int(*cache.ReaderEndpoint.Port))) } } for i, sgID := range cache.SecurityGroupIds { labels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelServerlessCacheSecurityGroupID, i))] = model.LabelValue(sgID) } for i, subnetID := range cache.SubnetIds { labels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelServerlessCacheSubnetID, i))] = model.LabelValue(subnetID) } if cache.CacheUsageLimits != nil { if cache.CacheUsageLimits.DataStorage != nil { if cache.CacheUsageLimits.DataStorage.Maximum != nil { labels[elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageMaximum] = model.LabelValue(strconv.Itoa(int(*cache.CacheUsageLimits.DataStorage.Maximum))) } if cache.CacheUsageLimits.DataStorage.Minimum != nil { labels[elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageMinimum] = model.LabelValue(strconv.Itoa(int(*cache.CacheUsageLimits.DataStorage.Minimum))) } labels[elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageUnit] = model.LabelValue(cache.CacheUsageLimits.DataStorage.Unit) } if cache.CacheUsageLimits.ECPUPerSecond != nil { if cache.CacheUsageLimits.ECPUPerSecond.Maximum != nil { labels[elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecondMaximum] = model.LabelValue(strconv.Itoa(int(*cache.CacheUsageLimits.ECPUPerSecond.Maximum))) } if cache.CacheUsageLimits.ECPUPerSecond.Minimum != nil { labels[elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecondMinimum] = model.LabelValue(strconv.Itoa(int(*cache.CacheUsageLimits.ECPUPerSecond.Minimum))) } } } for _, tag := range tags { if tag.Key != nil && tag.Value != nil { labels[model.LabelName(elasticacheLabelServerlessCacheTag+strutil.SanitizeLabelName(*tag.Key))] = model.LabelValue(*tag.Value) } } // Set the address label using the endpoint if cache.Endpoint != nil && cache.Endpoint.Address != nil && cache.Endpoint.Port != nil { labels[model.AddressLabel] = model.LabelValue(net.JoinHostPort(*cache.Endpoint.Address, strconv.Itoa(int(*cache.Endpoint.Port)))) } tg.Targets = append(tg.Targets, labels) } // addCacheClusterTargets adds targets for a cache cluster to the target group. // Creates one target per cache node for individual scraping. func addCacheClusterTargets(tg *targetgroup.Group, cluster *types.CacheCluster, tags []types.Tag) { // Build common labels that apply to all nodes in this cluster commonLabels := model.LabelSet{ elasticacheLabelDeploymentOption: model.LabelValue("node"), elasticacheLabelCacheClusterARN: model.LabelValue(*cluster.ARN), elasticacheLabelCacheClusterID: model.LabelValue(*cluster.CacheClusterId), elasticacheLabelCacheClusterStatus: model.LabelValue(*cluster.CacheClusterStatus), } if cluster.AtRestEncryptionEnabled != nil { commonLabels[elasticacheLabelCacheClusterAtRestEncryptionEnabled] = model.LabelValue(strconv.FormatBool(*cluster.AtRestEncryptionEnabled)) } if cluster.AuthTokenEnabled != nil { commonLabels[elasticacheLabelCacheClusterAuthTokenEnabled] = model.LabelValue(strconv.FormatBool(*cluster.AuthTokenEnabled)) } if cluster.AuthTokenLastModifiedDate != nil { commonLabels[elasticacheLabelCacheClusterAuthTokenLastModified] = model.LabelValue(cluster.AuthTokenLastModifiedDate.Format(time.RFC3339)) } if cluster.AutoMinorVersionUpgrade != nil { commonLabels[elasticacheLabelCacheClusterAutoMinorVersionUpgrade] = model.LabelValue(strconv.FormatBool(*cluster.AutoMinorVersionUpgrade)) } if cluster.CacheClusterCreateTime != nil { commonLabels[elasticacheLabelCacheClusterCreateTime] = model.LabelValue(cluster.CacheClusterCreateTime.Format(time.RFC3339)) } if cluster.CacheNodeType != nil { commonLabels[elasticacheLabelCacheClusterNodeType] = model.LabelValue(*cluster.CacheNodeType) } if cluster.CacheParameterGroup != nil && cluster.CacheParameterGroup.CacheParameterGroupName != nil { commonLabels[elasticacheLabelCacheClusterParameterGroup] = model.LabelValue(*cluster.CacheParameterGroup.CacheParameterGroupName) } if cluster.CacheSubnetGroupName != nil { commonLabels[elasticacheLabelCacheClusterSubnetGroupName] = model.LabelValue(*cluster.CacheSubnetGroupName) } if cluster.ClientDownloadLandingPage != nil { commonLabels[elasticacheLabelCacheClusterClientDownloadLandingPage] = model.LabelValue(*cluster.ClientDownloadLandingPage) } if cluster.ConfigurationEndpoint != nil { if cluster.ConfigurationEndpoint.Address != nil { commonLabels[elasticacheLabelCacheClusterConfigurationEndpointAddress] = model.LabelValue(*cluster.ConfigurationEndpoint.Address) } if cluster.ConfigurationEndpoint.Port != nil { commonLabels[elasticacheLabelCacheClusterConfigurationEndpointPort] = model.LabelValue(strconv.Itoa(int(*cluster.ConfigurationEndpoint.Port))) } } if cluster.Engine != nil { commonLabels[elasticacheLabelCacheClusterEngine] = model.LabelValue(*cluster.Engine) } if cluster.EngineVersion != nil { commonLabels[elasticacheLabelCacheClusterEngineVersion] = model.LabelValue(*cluster.EngineVersion) } if len(cluster.IpDiscovery) > 0 { commonLabels[elasticacheLabelCacheClusterIPDiscovery] = model.LabelValue(cluster.IpDiscovery) } if len(cluster.NetworkType) > 0 { commonLabels[elasticacheLabelCacheClusterNetworkType] = model.LabelValue(cluster.NetworkType) } if cluster.NotificationConfiguration != nil { if cluster.NotificationConfiguration.TopicArn != nil { commonLabels[elasticacheLabelCacheClusterNotificationTopicARN] = model.LabelValue(*cluster.NotificationConfiguration.TopicArn) } if cluster.NotificationConfiguration.TopicStatus != nil { commonLabels[elasticacheLabelCacheClusterNotificationTopicStatus] = model.LabelValue(*cluster.NotificationConfiguration.TopicStatus) } } if cluster.NumCacheNodes != nil { commonLabels[elasticacheLabelCacheClusterNumCacheNodes] = model.LabelValue(strconv.Itoa(int(*cluster.NumCacheNodes))) } if cluster.PreferredAvailabilityZone != nil { commonLabels[elasticacheLabelCacheClusterPreferredAvailabilityZone] = model.LabelValue(*cluster.PreferredAvailabilityZone) } if cluster.PreferredMaintenanceWindow != nil { commonLabels[elasticacheLabelCacheClusterPreferredMaintenanceWindow] = model.LabelValue(*cluster.PreferredMaintenanceWindow) } if cluster.PreferredOutpostArn != nil { commonLabels[elasticacheLabelCacheClusterPreferredOutpostARN] = model.LabelValue(*cluster.PreferredOutpostArn) } if cluster.ReplicationGroupId != nil { commonLabels[elasticacheLabelCacheClusterReplicationGroupID] = model.LabelValue(*cluster.ReplicationGroupId) } if cluster.ReplicationGroupLogDeliveryEnabled != nil { commonLabels[elasticacheLabelCacheClusterReplicationGroupLogDeliveryEnabled] = model.LabelValue(strconv.FormatBool(*cluster.ReplicationGroupLogDeliveryEnabled)) } if cluster.SnapshotRetentionLimit != nil { commonLabels[elasticacheLabelCacheClusterSnapshotRetentionLimit] = model.LabelValue(strconv.Itoa(int(*cluster.SnapshotRetentionLimit))) } if cluster.SnapshotWindow != nil { commonLabels[elasticacheLabelCacheClusterSnapshotWindow] = model.LabelValue(*cluster.SnapshotWindow) } if cluster.TransitEncryptionEnabled != nil { commonLabels[elasticacheLabelCacheClusterTransitEncryptionEnabled] = model.LabelValue(strconv.FormatBool(*cluster.TransitEncryptionEnabled)) } if len(cluster.TransitEncryptionMode) > 0 { commonLabels[elasticacheLabelCacheClusterTransitEncryptionMode] = model.LabelValue(cluster.TransitEncryptionMode) } // Log delivery configurations (slice) for i, logDelivery := range cluster.LogDeliveryConfigurations { if len(logDelivery.DestinationType) > 0 { commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationDestinationType, i))] = model.LabelValue(logDelivery.DestinationType) } if len(logDelivery.LogFormat) > 0 { commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationLogFormat, i))] = model.LabelValue(logDelivery.LogFormat) } if len(logDelivery.LogType) > 0 { commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationLogType, i))] = model.LabelValue(logDelivery.LogType) } if len(logDelivery.Status) > 0 { commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationStatus, i))] = model.LabelValue(logDelivery.Status) } if logDelivery.Message != nil { commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationMessage, i))] = model.LabelValue(*logDelivery.Message) } if logDelivery.DestinationDetails != nil { if logDelivery.DestinationDetails.CloudWatchLogsDetails != nil && logDelivery.DestinationDetails.CloudWatchLogsDetails.LogGroup != nil { commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationLogGroup, i))] = model.LabelValue(*logDelivery.DestinationDetails.CloudWatchLogsDetails.LogGroup) } if logDelivery.DestinationDetails.KinesisFirehoseDetails != nil && logDelivery.DestinationDetails.KinesisFirehoseDetails.DeliveryStream != nil { commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationDeliveryStream, i))] = model.LabelValue(*logDelivery.DestinationDetails.KinesisFirehoseDetails.DeliveryStream) } } } // Pending modified values if cluster.PendingModifiedValues != nil { if len(cluster.PendingModifiedValues.AuthTokenStatus) > 0 { commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesAuthTokenStatus] = model.LabelValue(cluster.PendingModifiedValues.AuthTokenStatus) } if cluster.PendingModifiedValues.CacheNodeType != nil { commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesCacheNodeType] = model.LabelValue(*cluster.PendingModifiedValues.CacheNodeType) } if cluster.PendingModifiedValues.EngineVersion != nil { commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesEngineVersion] = model.LabelValue(*cluster.PendingModifiedValues.EngineVersion) } if cluster.PendingModifiedValues.NumCacheNodes != nil { commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesNumCacheNodes] = model.LabelValue(strconv.Itoa(int(*cluster.PendingModifiedValues.NumCacheNodes))) } if cluster.PendingModifiedValues.TransitEncryptionEnabled != nil { commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesTransitEncryptionEnabled] = model.LabelValue(strconv.FormatBool(*cluster.PendingModifiedValues.TransitEncryptionEnabled)) } if len(cluster.PendingModifiedValues.TransitEncryptionMode) > 0 { commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesTransitEncryptionMode] = model.LabelValue(cluster.PendingModifiedValues.TransitEncryptionMode) } if len(cluster.PendingModifiedValues.CacheNodeIdsToRemove) > 0 { commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesCacheNodeIDsToRemove] = model.LabelValue(strings.Join(cluster.PendingModifiedValues.CacheNodeIdsToRemove, ",")) } } // Security group membership (slice) for i, sg := range cluster.SecurityGroups { if sg.SecurityGroupId != nil { commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterSecurityGroupMembershipID, i))] = model.LabelValue(*sg.SecurityGroupId) } if sg.Status != nil { commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterSecurityGroupMembershipStatus, i))] = model.LabelValue(*sg.Status) } } // Tags for _, tag := range tags { if tag.Key != nil && tag.Value != nil { commonLabels[model.LabelName(elasticacheLabelCacheClusterTag+strutil.SanitizeLabelName(*tag.Key))] = model.LabelValue(*tag.Value) } } // Create one target per cache node for _, node := range cluster.CacheNodes { // Clone common labels for this node labels := make(model.LabelSet, len(commonLabels)) maps.Copy(labels, commonLabels) // Add node-specific labels if node.CacheNodeId != nil { labels[elasticacheLabelCacheClusterNodeID] = model.LabelValue(*node.CacheNodeId) } if node.CacheNodeStatus != nil { labels[elasticacheLabelCacheClusterNodeStatus] = model.LabelValue(*node.CacheNodeStatus) } if node.CacheNodeCreateTime != nil { labels[elasticacheLabelCacheClusterNodeCreateTime] = model.LabelValue(node.CacheNodeCreateTime.Format(time.RFC3339)) } if node.CustomerAvailabilityZone != nil { labels[elasticacheLabelCacheClusterNodeAZ] = model.LabelValue(*node.CustomerAvailabilityZone) } if node.CustomerOutpostArn != nil { labels[elasticacheLabelCacheClusterNodeCustomerOutpostARN] = model.LabelValue(*node.CustomerOutpostArn) } if node.SourceCacheNodeId != nil { labels[elasticacheLabelCacheClusterNodeSourceCacheNodeID] = model.LabelValue(*node.SourceCacheNodeId) } if node.ParameterGroupStatus != nil { labels[elasticacheLabelCacheClusterNodeParameterGroupStatus] = model.LabelValue(*node.ParameterGroupStatus) } if node.Endpoint != nil { if node.Endpoint.Address != nil { labels[elasticacheLabelCacheClusterNodeEndpointAddress] = model.LabelValue(*node.Endpoint.Address) } if node.Endpoint.Port != nil { labels[elasticacheLabelCacheClusterNodeEndpointPort] = model.LabelValue(strconv.Itoa(int(*node.Endpoint.Port))) } // Set the address label to this node's endpoint if node.Endpoint.Address != nil && node.Endpoint.Port != nil { labels[model.AddressLabel] = model.LabelValue(net.JoinHostPort(*node.Endpoint.Address, strconv.Itoa(int(*node.Endpoint.Port)))) } } tg.Targets = append(tg.Targets, labels) } }