diff --git a/discovery/aws/ecs.go b/discovery/aws/ecs.go index 1d5ff366de..9ecfcc44fe 100644 --- a/discovery/aws/ecs.go +++ b/discovery/aws/ecs.go @@ -19,7 +19,9 @@ import ( "fmt" "log/slog" "net" + "slices" "strconv" + "strings" "sync" "time" @@ -273,7 +275,6 @@ func (d *ECSDiscovery) initEcsClient(ctx context.Context) error { // listClusterARNs returns a slice of cluster arns. // This method does not use concurrency as it's a simple paginated call. -// AWS ECS Cluster read actions have burst=50, sustained=20 req/sec limits. func (d *ECSDiscovery) listClusterARNs(ctx context.Context) ([]string, error) { var ( clusterARNs []string @@ -281,7 +282,8 @@ func (d *ECSDiscovery) listClusterARNs(ctx context.Context) ([]string, error) { ) for { resp, err := d.ecs.ListClusters(ctx, &ecs.ListClustersInput{ - NextToken: nextToken, + NextToken: nextToken, + MaxResults: aws.Int32(100), }) if err != nil { return nil, fmt.Errorf("could not list clusters: %w", err) @@ -299,56 +301,61 @@ func (d *ECSDiscovery) listClusterARNs(ctx context.Context) ([]string, error) { } // describeClusters returns a map of cluster ARN to a slice of clusters. -// This method processes clusters in batches without concurrency as it's typically -// a single call handling up to 100 clusters. AWS ECS Cluster read actions have -// burst=50, sustained=20 req/sec limits. +// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling. +// Clusters are described in batches of 100 to respect AWS API limits (DescribeClusters allows up to 100 clusters per call). func (d *ECSDiscovery) describeClusters(ctx context.Context, clusters []string) (map[string]types.Cluster, error) { + mu := sync.Mutex{} clusterMap := make(map[string]types.Cluster) - - // AWS DescribeClusters can handle up to 100 clusters per call - batchSize := 100 - for _, batch := range batchSlice(clusters, batchSize) { - resp, err := d.ecs.DescribeClusters(ctx, &ecs.DescribeClustersInput{ - Clusters: batch, - Include: []types.ClusterField{"TAGS"}, - }) - if err != nil { - d.logger.Error("Failed to describe clusters", "clusters", batch, "error", err) - return nil, fmt.Errorf("could not describe clusters %v: %w", batch, err) - } - - for _, c := range resp.Clusters { - if c.ClusterArn != nil { - clusterMap[*c.ClusterArn] = c + errg, ectx := errgroup.WithContext(ctx) + errg.SetLimit(d.cfg.RequestConcurrency) + for batch := range slices.Chunk(clusters, 100) { + errg.Go(func() error { + resp, err := d.ecs.DescribeClusters(ectx, &ecs.DescribeClustersInput{ + Clusters: batch, + Include: []types.ClusterField{"TAGS"}, + }) + if err != nil { + d.logger.Error("Failed to describe clusters", "clusters", batch, "error", err) + return fmt.Errorf("could not describe clusters %v: %w", batch, err) } - } + + for _, cluster := range resp.Clusters { + if cluster.ClusterArn != nil { + mu.Lock() + clusterMap[*cluster.ClusterArn] = cluster + mu.Unlock() + } + } + return nil + }) } - return clusterMap, nil + return clusterMap, errg.Wait() } // listServiceARNs returns a map of cluster ARN to a slice of service ARNs. // Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling. -// AWS ECS Service read actions have burst=100, sustained=20 req/sec limits. +// Services are listed in batches of 100 to respect AWS API limits (ListServices allows up to 100 services per call). func (d *ECSDiscovery) listServiceARNs(ctx context.Context, clusters []string) (map[string][]string, error) { - serviceARNsMu := sync.Mutex{} - serviceARNs := make(map[string][]string) + mu := sync.Mutex{} + services := make(map[string][]string) errg, ectx := errgroup.WithContext(ctx) errg.SetLimit(d.cfg.RequestConcurrency) for _, clusterARN := range clusters { errg.Go(func() error { var nextToken *string - var clusterServiceARNs []string + var serviceARNs []string for { resp, err := d.ecs.ListServices(ectx, &ecs.ListServicesInput{ - Cluster: aws.String(clusterARN), - NextToken: nextToken, + Cluster: aws.String(clusterARN), + NextToken: nextToken, + MaxResults: aws.Int32(100), }) if err != nil { return fmt.Errorf("could not list services for cluster %q: %w", clusterARN, err) } - clusterServiceARNs = append(clusterServiceARNs, resp.ServiceArns...) + serviceARNs = append(serviceARNs, resp.ServiceArns...) if resp.NextToken == nil { break @@ -356,75 +363,76 @@ func (d *ECSDiscovery) listServiceARNs(ctx context.Context, clusters []string) ( nextToken = resp.NextToken } - serviceARNsMu.Lock() - serviceARNs[clusterARN] = clusterServiceARNs - serviceARNsMu.Unlock() + mu.Lock() + services[clusterARN] = serviceARNs + mu.Unlock() return nil }) } - return serviceARNs, errg.Wait() -} - -// describeServices returns a map of cluster ARN to services. -// Uses concurrent requests with batching (10 services per request) to respect AWS API limits. -// AWS ECS Service read actions have burst=100, sustained=20 req/sec limits. -func (d *ECSDiscovery) describeServices(ctx context.Context, clusterServiceARNsMap map[string][]string) (map[string][]types.Service, error) { - batchSize := 10 // AWS DescribeServices API limit is 10 services per request - serviceMu := sync.Mutex{} - services := make(map[string][]types.Service) - errg, ectx := errgroup.WithContext(ctx) - errg.SetLimit(d.cfg.RequestConcurrency) - for clusterARN, serviceARNs := range clusterServiceARNsMap { - for _, batch := range batchSlice(serviceARNs, batchSize) { - errg.Go(func() error { - resp, err := d.ecs.DescribeServices(ectx, &ecs.DescribeServicesInput{ - Services: batch, - Cluster: aws.String(clusterARN), - Include: []types.ServiceField{"TAGS"}, - }) - if err != nil { - d.logger.Error("Failed to describe services", "cluster", clusterARN, "batch", batch, "error", err) - return fmt.Errorf("could not describe services for cluster %q: %w", clusterARN, err) - } - - serviceMu.Lock() - services[clusterARN] = append(services[clusterARN], resp.Services...) - serviceMu.Unlock() - - return nil - }) - } - } - return services, errg.Wait() } -// listTaskARNs returns a map of service ARN to a slice of task ARNs. +// describeServices returns a map of service name to service. // Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling. -// AWS ECS Cluster resource read actions have burst=100, sustained=20 req/sec limits. -func (d *ECSDiscovery) listTaskARNs(ctx context.Context, services []types.Service) (map[string][]string, error) { - taskARNsMu := sync.Mutex{} - taskARNs := make(map[string][]string) +// Services are described in batches of 10 to respect AWS API limits (DescribeServices allows up to 10 services per call). +func (d *ECSDiscovery) describeServices(ctx context.Context, clusterARN string, serviceARNS []string) (map[string]types.Service, error) { + mu := sync.Mutex{} + services := make(map[string]types.Service) errg, ectx := errgroup.WithContext(ctx) errg.SetLimit(d.cfg.RequestConcurrency) - for _, service := range services { + for batch := range slices.Chunk(serviceARNS, 10) { errg.Go(func() error { - serviceArn := aws.ToString(service.ServiceArn) + resp, err := d.ecs.DescribeServices(ectx, &ecs.DescribeServicesInput{ + Cluster: aws.String(clusterARN), + Services: batch, + Include: []types.ServiceField{"TAGS"}, + }) + if err != nil { + d.logger.Error("Failed to describe services", "cluster", clusterARN, "batch", batch, "error", err) + return fmt.Errorf("could not describe services for cluster %q: batch %v: %w", clusterARN, batch, err) + } - var nextToken *string - var serviceTaskARNs []string + for _, service := range resp.Services { + if service.ServiceArn != nil { + mu.Lock() + services[*service.ServiceName] = service + mu.Unlock() + } + } + return nil + }) + } + + return services, errg.Wait() +} + +// listTaskARNs returns a map of clustersARN to a slice of task ARNs. +// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling. +// Tasks are listed in batches of 100 to respect AWS API limits (ListTasks allows up to 100 tasks per call). +// This method also uses pagination to handle cases where there are more than 100 tasks in a cluster. +func (d *ECSDiscovery) listTaskARNs(ctx context.Context, clusterARNs []string) (map[string][]string, error) { + mu := sync.Mutex{} + tasks := make(map[string][]string) + errg, ectx := errgroup.WithContext(ctx) + errg.SetLimit(d.cfg.RequestConcurrency) + for _, clusterARN := range clusterARNs { + errg.Go(func() error { + var ( + nextToken *string + taskARNs []string + ) for { resp, err := d.ecs.ListTasks(ectx, &ecs.ListTasksInput{ - Cluster: aws.String(*service.ClusterArn), - ServiceName: aws.String(*service.ServiceName), - NextToken: nextToken, + Cluster: aws.String(clusterARN), + NextToken: nextToken, + MaxResults: aws.Int32(100), }) if err != nil { - return fmt.Errorf("could not list tasks for service %q: %w", serviceArn, err) + return fmt.Errorf("could not list tasks for cluster %q: %w", clusterARN, err) } - serviceTaskARNs = append(serviceTaskARNs, resp.TaskArns...) + taskARNs = append(taskARNs, resp.TaskArns...) if resp.NextToken == nil { break @@ -432,77 +440,87 @@ func (d *ECSDiscovery) listTaskARNs(ctx context.Context, services []types.Servic nextToken = resp.NextToken } - taskARNsMu.Lock() - taskARNs[serviceArn] = serviceTaskARNs - taskARNsMu.Unlock() + mu.Lock() + tasks[clusterARN] = taskARNs + mu.Unlock() return nil }) } - return taskARNs, errg.Wait() + return tasks, errg.Wait() } -// describeTasks returns a map of task arn to a slice task. -// Uses concurrent requests with batching (100 tasks per request) to respect AWS API limits. -// AWS ECS Cluster resource read actions have burst=100, sustained=20 req/sec limits. -func (d *ECSDiscovery) describeTasks(ctx context.Context, clusterARN string, taskARNsMap map[string][]string) (map[string][]types.Task, error) { - batchSize := 100 // AWS DescribeTasks API limit is 100 tasks per request - taskMu := sync.Mutex{} - tasks := make(map[string][]types.Task) +// describeTasks returns a slice of tasks. +// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling. +// Tasks are described in batches of 100 to respect AWS API limits (DescribeTasks allows up to 100 tasks per call). +func (d *ECSDiscovery) describeTasks(ctx context.Context, clusterARN string, taskARNs []string) ([]types.Task, error) { + mu := sync.Mutex{} + var tasks []types.Task errg, ectx := errgroup.WithContext(ctx) errg.SetLimit(d.cfg.RequestConcurrency) - for serviceARN, taskARNs := range taskARNsMap { - for _, batch := range batchSlice(taskARNs, batchSize) { - errg.Go(func() error { - resp, err := d.ecs.DescribeTasks(ectx, &ecs.DescribeTasksInput{ - Cluster: aws.String(clusterARN), - Tasks: batch, - Include: []types.TaskField{"TAGS"}, - }) - if err != nil { - d.logger.Error("Failed to describe tasks", "service", serviceARN, "cluster", clusterARN, "batch", batch, "error", err) - return fmt.Errorf("could not describe tasks for service %q in cluster %q: %w", serviceARN, clusterARN, err) - } - - taskMu.Lock() - tasks[serviceARN] = append(tasks[serviceARN], resp.Tasks...) - taskMu.Unlock() - - return nil + for batch := range slices.Chunk(taskARNs, 100) { + errg.Go(func() error { + resp, err := d.ecs.DescribeTasks(ectx, &ecs.DescribeTasksInput{ + Cluster: aws.String(clusterARN), + Tasks: batch, + Include: []types.TaskField{"TAGS"}, }) - } + if err != nil { + d.logger.Error("Failed to describe tasks", "cluster", clusterARN, "batch", batch, "error", err) + return fmt.Errorf("could not describe tasks in cluster %q: batch %v: %w", clusterARN, batch, err) + } + + mu.Lock() + tasks = append(tasks, resp.Tasks...) + mu.Unlock() + return nil + }) } return tasks, errg.Wait() } // describeContainerInstances returns a map of container instance ARN to EC2 instance ID -// Uses batching to respect AWS API limits (100 container instances per request). -func (d *ECSDiscovery) describeContainerInstances(ctx context.Context, clusterARN string, containerInstanceARNs []string) (map[string]string, error) { +// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling. +// Container instances are described in batches of 100 to respect AWS API limits (DescribeContainerInstances allows up to 100 container instances per call). +func (d *ECSDiscovery) describeContainerInstances(ctx context.Context, clusterARN string, tasks []types.Task) (map[string]string, error) { + containerInstanceARNs := make([]string, 0, len(tasks)) + for _, task := range tasks { + if task.ContainerInstanceArn != nil { + containerInstanceARNs = append(containerInstanceARNs, *task.ContainerInstanceArn) + } + } + if len(containerInstanceARNs) == 0 { return make(map[string]string), nil } + mu := sync.Mutex{} containerInstToEC2 := make(map[string]string) - batchSize := 100 // AWS API limit - - for _, batch := range batchSlice(containerInstanceARNs, batchSize) { - resp, err := d.ecs.DescribeContainerInstances(ctx, &ecs.DescribeContainerInstancesInput{ - Cluster: aws.String(clusterARN), - ContainerInstances: batch, - }) - if err != nil { - return nil, fmt.Errorf("could not describe container instances: %w", err) - } - - for _, ci := range resp.ContainerInstances { - if ci.ContainerInstanceArn != nil && ci.Ec2InstanceId != nil { - containerInstToEC2[*ci.ContainerInstanceArn] = *ci.Ec2InstanceId + errg, ectx := errgroup.WithContext(ctx) + errg.SetLimit(d.cfg.RequestConcurrency) + for batch := range slices.Chunk(containerInstanceARNs, 100) { + errg.Go(func() error { + resp, err := d.ecs.DescribeContainerInstances(ectx, &ecs.DescribeContainerInstancesInput{ + Cluster: aws.String(clusterARN), + ContainerInstances: batch, + }) + if err != nil { + return fmt.Errorf("could not describe container instances: %w", err) } - } + + for _, ci := range resp.ContainerInstances { + if ci.ContainerInstanceArn != nil && ci.Ec2InstanceId != nil { + mu.Lock() + containerInstToEC2[*ci.ContainerInstanceArn] = *ci.Ec2InstanceId + mu.Unlock() + } + } + return nil + }) } - return containerInstToEC2, nil + return containerInstToEC2, errg.Wait() } // ec2InstanceInfo holds information retrieved from EC2 DescribeInstances. @@ -515,83 +533,112 @@ type ec2InstanceInfo struct { } // describeEC2Instances returns a map of EC2 instance ID to instance information. +// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling. +// This method does not use concurrency as it's a simple paginated call. func (d *ECSDiscovery) describeEC2Instances(ctx context.Context, instanceIDs []string) (map[string]ec2InstanceInfo, error) { if len(instanceIDs) == 0 { return make(map[string]ec2InstanceInfo), nil } instanceInfo := make(map[string]ec2InstanceInfo) + var nextToken *string - resp, err := d.ec2.DescribeInstances(ctx, &ec2.DescribeInstancesInput{ - InstanceIds: instanceIDs, - }) - if err != nil { - return nil, fmt.Errorf("could not describe EC2 instances: %w", err) - } + for { + resp, err := d.ec2.DescribeInstances(ctx, &ec2.DescribeInstancesInput{ + InstanceIds: instanceIDs, + NextToken: nextToken, + }) + if err != nil { + return nil, fmt.Errorf("could not describe EC2 instances: %w", err) + } - for _, reservation := range resp.Reservations { - for _, instance := range reservation.Instances { - if instance.InstanceId != nil && instance.PrivateIpAddress != nil { - info := ec2InstanceInfo{ - privateIP: *instance.PrivateIpAddress, - tags: make(map[string]string), - } - if instance.PublicIpAddress != nil { - info.publicIP = *instance.PublicIpAddress - } - if instance.SubnetId != nil { - info.subnetID = *instance.SubnetId - } - if instance.InstanceType != "" { - info.instanceType = string(instance.InstanceType) - } - // Collect EC2 instance tags - for _, tag := range instance.Tags { - if tag.Key != nil && tag.Value != nil { - info.tags[*tag.Key] = *tag.Value + for _, reservation := range resp.Reservations { + for _, instance := range reservation.Instances { + if instance.InstanceId != nil && instance.PrivateIpAddress != nil { + info := ec2InstanceInfo{ + privateIP: *instance.PrivateIpAddress, + tags: make(map[string]string), } + if instance.PublicIpAddress != nil { + info.publicIP = *instance.PublicIpAddress + } + if instance.SubnetId != nil { + info.subnetID = *instance.SubnetId + } + if instance.InstanceType != "" { + info.instanceType = string(instance.InstanceType) + } + // Collect EC2 instance tags + for _, tag := range instance.Tags { + if tag.Key != nil && tag.Value != nil { + info.tags[*tag.Key] = *tag.Value + } + } + instanceInfo[*instance.InstanceId] = info } - instanceInfo[*instance.InstanceId] = info } } + + if resp.NextToken == nil { + break + } + nextToken = resp.NextToken } return instanceInfo, nil } // describeNetworkInterfaces returns a map of ENI ID to public IP address. -func (d *ECSDiscovery) describeNetworkInterfaces(ctx context.Context, eniIDs []string) (map[string]string, error) { +// This is needed to get the public IP for tasks using awsvpc network mode, as the ENI is what gets the public IP, not the EC2 instance. +// This method does not use concurrency as it's a simple paginated call. +func (d *ECSDiscovery) describeNetworkInterfaces(ctx context.Context, tasks []types.Task) (map[string]string, error) { + eniIDs := make([]string, 0, len(tasks)) + + for _, task := range tasks { + for _, attachment := range task.Attachments { + if attachment.Type != nil && *attachment.Type == "ElasticNetworkInterface" { + for _, detail := range attachment.Details { + if detail.Name != nil && *detail.Name == "networkInterfaceId" && detail.Value != nil { + eniIDs = append(eniIDs, *detail.Value) + break + } + } + break + } + } + } + if len(eniIDs) == 0 { return make(map[string]string), nil } eniToPublicIP := make(map[string]string) + var nextToken *string - resp, err := d.ec2.DescribeNetworkInterfaces(ctx, &ec2.DescribeNetworkInterfacesInput{ - NetworkInterfaceIds: eniIDs, - }) - if err != nil { - return nil, fmt.Errorf("could not describe network interfaces: %w", err) - } - - for _, eni := range resp.NetworkInterfaces { - if eni.NetworkInterfaceId != nil && eni.Association != nil && eni.Association.PublicIp != nil { - eniToPublicIP[*eni.NetworkInterfaceId] = *eni.Association.PublicIp + for { + resp, err := d.ec2.DescribeNetworkInterfaces(ctx, &ec2.DescribeNetworkInterfacesInput{ + NetworkInterfaceIds: eniIDs, + NextToken: nextToken, + }) + if err != nil { + return nil, fmt.Errorf("could not describe network interfaces: %w", err) } + + for _, eni := range resp.NetworkInterfaces { + if eni.NetworkInterfaceId != nil && eni.Association != nil && eni.Association.PublicIp != nil { + eniToPublicIP[*eni.NetworkInterfaceId] = *eni.Association.PublicIp + } + } + + if resp.NextToken == nil { + break + } + nextToken = resp.NextToken } return eniToPublicIP, nil } -func batchSlice[T any](a []T, size int) [][]T { - batches := make([][]T, 0, len(a)/size+1) - for i := 0; i < len(a); i += size { - end := min(i+size, len(a)) - batches = append(batches, a[i:end]) - } - return batches -} - func (d *ECSDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { err := d.initEcsClient(ctx) if err != nil { @@ -620,314 +667,338 @@ func (d *ECSDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error Source: d.cfg.Region, } - clusterARNMap, err := d.describeClusters(ctx, clusters) - if err != nil { - return nil, err - } + // Fetch cluster details, service ARNs, and task ARNs in parallel + var ( + clusterMap map[string]types.Cluster + serviceMap map[string][]string + taskMap map[string][]string + ) - clusterServiceARNMap, err := d.listServiceARNs(ctx, clusters) - if err != nil { - return nil, err - } + clusterErrg, clusterCtx := errgroup.WithContext(ctx) + clusterErrg.Go(func() error { + var err error + clusterMap, err = d.describeClusters(clusterCtx, clusters) + return err + }) + clusterErrg.Go(func() error { + var err error + serviceMap, err = d.listServiceARNs(clusterCtx, clusters) + return err + }) + clusterErrg.Go(func() error { + var err error + taskMap, err = d.listTaskARNs(clusterCtx, clusters) + return err + }) - clusterServicesMap, err := d.describeServices(ctx, clusterServiceARNMap) - if err != nil { + if err := clusterErrg.Wait(); err != nil { return nil, err } // Use goroutines to process clusters in parallel var ( - targetsMu sync.Mutex - wg sync.WaitGroup + clusterWg sync.WaitGroup + clusterMu sync.Mutex + clusterTargets []model.LabelSet ) - for clusterArn, clusterServices := range clusterServicesMap { - if len(clusterServices) == 0 { + for clusterARN, taskARNs := range taskMap { + if len(taskARNs) == 0 { continue } - wg.Add(1) - go func(clusterArn string, clusterServices []types.Service) { - defer wg.Done() + clusterWg.Add(1) - serviceTaskARNMap, err := d.listTaskARNs(ctx, clusterServices) - if err != nil { - d.logger.Error("Failed to list task ARNs for cluster", "cluster", clusterArn, "error", err) - return - } + go func(cluster types.Cluster, serviceARNs, taskARNs []string) { + defer clusterWg.Done() - serviceTaskMap, err := d.describeTasks(ctx, clusterArn, serviceTaskARNMap) - if err != nil { - d.logger.Error("Failed to describe tasks for cluster", "cluster", clusterArn, "error", err) - return - } - - // Process services within this cluster in parallel + // Fetch services and tasks in parallel (they're independent) var ( - serviceWg sync.WaitGroup - localTargets []model.LabelSet - localTargetsMu sync.Mutex + services map[string]types.Service + tasks []types.Task ) - for _, clusterService := range clusterServices { - serviceWg.Add(1) - go func(clusterService types.Service) { - defer serviceWg.Done() + resourceErrg, resourceCtx := errgroup.WithContext(ctx) + resourceErrg.Go(func() error { + var err error + services, err = d.describeServices(resourceCtx, *cluster.ClusterArn, serviceARNs) + if err != nil { + d.logger.Error("Failed to describe services for cluster", "cluster", *cluster.ClusterArn, "error", err) + } + return err + }) + resourceErrg.Go(func() error { + var err error + tasks, err = d.describeTasks(resourceCtx, *cluster.ClusterArn, taskARNs) + if err != nil { + d.logger.Error("Failed to describe tasks for cluster", "cluster", *cluster.ClusterArn, "error", err) + } + return err + }) - serviceArn := *clusterService.ServiceArn - - if tasks, exists := serviceTaskMap[serviceArn]; exists { - var serviceTargets []model.LabelSet - - // Collect container instance ARNs for all EC2 tasks to get instance type - var containerInstanceARNs []string - taskToContainerInstance := make(map[string]string) - // Collect ENI IDs for awsvpc tasks to get public IPs - var eniIDs []string - taskToENI := make(map[string]string) - - for _, task := range tasks { - // Collect container instance ARN for any task running on EC2 - if task.ContainerInstanceArn != nil { - containerInstanceARNs = append(containerInstanceARNs, *task.ContainerInstanceArn) - taskToContainerInstance[*task.TaskArn] = *task.ContainerInstanceArn - } - - // Collect ENI IDs from awsvpc tasks - for _, attachment := range task.Attachments { - if attachment.Type != nil && *attachment.Type == "ElasticNetworkInterface" { - for _, detail := range attachment.Details { - if detail.Name != nil && *detail.Name == "networkInterfaceId" && detail.Value != nil { - eniIDs = append(eniIDs, *detail.Value) - taskToENI[*task.TaskArn] = *detail.Value - break - } - } - break - } - } - } - - // Batch describe container instances and EC2 instances to get instance type and other metadata - var containerInstToEC2 map[string]string - var ec2InstInfo map[string]ec2InstanceInfo - if len(containerInstanceARNs) > 0 { - var err error - containerInstToEC2, err = d.describeContainerInstances(ctx, clusterArn, containerInstanceARNs) - if err != nil { - d.logger.Error("Failed to describe container instances", "cluster", clusterArn, "error", err) - // Continue processing tasks - } else { - // Collect unique EC2 instance IDs - ec2InstanceIDs := make([]string, 0, len(containerInstToEC2)) - for _, ec2ID := range containerInstToEC2 { - ec2InstanceIDs = append(ec2InstanceIDs, ec2ID) - } - - // Batch describe EC2 instances - ec2InstInfo, err = d.describeEC2Instances(ctx, ec2InstanceIDs) - if err != nil { - d.logger.Error("Failed to describe EC2 instances", "cluster", clusterArn, "error", err) - } - } - } - - // Batch describe ENIs to get public IPs for awsvpc tasks - var eniToPublicIP map[string]string - if len(eniIDs) > 0 { - var err error - eniToPublicIP, err = d.describeNetworkInterfaces(ctx, eniIDs) - if err != nil { - d.logger.Error("Failed to describe network interfaces", "cluster", clusterArn, "error", err) - // Continue processing without ENI public IPs - } - } - - for _, task := range tasks { - var ipAddress, subnetID, publicIP string - var networkMode string - var ec2InstanceID, ec2InstanceType, ec2InstancePrivateIP, ec2InstancePublicIP string - - // Try to get IP from ENI attachment (awsvpc mode) - var eniAttachment *types.Attachment - for _, attachment := range task.Attachments { - if attachment.Type != nil && *attachment.Type == "ElasticNetworkInterface" { - eniAttachment = &attachment - break - } - } - - if eniAttachment != nil { - // awsvpc networking mode - get IP from ENI - networkMode = "awsvpc" - for _, detail := range eniAttachment.Details { - switch *detail.Name { - case "privateIPv4Address": - ipAddress = *detail.Value - case "subnetId": - subnetID = *detail.Value - } - } - // Get public IP from ENI if available - if eniID, ok := taskToENI[*task.TaskArn]; ok { - if eniPublicIP, ok := eniToPublicIP[eniID]; ok { - publicIP = eniPublicIP - } - } - } else if task.ContainerInstanceArn != nil { - // bridge/host networking mode - need to get EC2 instance IP and subnet - networkMode = "bridge" - containerInstARN, ok := taskToContainerInstance[*task.TaskArn] - if ok { - ec2InstanceID, ok = containerInstToEC2[containerInstARN] - if ok { - info, ok := ec2InstInfo[ec2InstanceID] - if ok { - ipAddress = info.privateIP - publicIP = info.publicIP - subnetID = info.subnetID - ec2InstanceType = info.instanceType - ec2InstancePrivateIP = info.privateIP - ec2InstancePublicIP = info.publicIP - } else { - d.logger.Debug("EC2 instance info not found", "instance", ec2InstanceID, "task", *task.TaskArn) - } - } else { - d.logger.Debug("Container instance not found in map", "arn", containerInstARN, "task", *task.TaskArn) - } - } - } - - // Get EC2 instance metadata for awsvpc tasks running on EC2 - // We want the instance type and the host IPs for advanced use cases - if networkMode == "awsvpc" && task.ContainerInstanceArn != nil { - containerInstARN, ok := taskToContainerInstance[*task.TaskArn] - if ok { - ec2InstanceID, ok = containerInstToEC2[containerInstARN] - if ok { - info, ok := ec2InstInfo[ec2InstanceID] - if ok { - ec2InstanceType = info.instanceType - ec2InstancePrivateIP = info.privateIP - ec2InstancePublicIP = info.publicIP - } - } - } - } - - if ipAddress == "" { - continue - } - - labels := model.LabelSet{ - ecsLabelClusterARN: model.LabelValue(*clusterService.ClusterArn), - ecsLabelService: model.LabelValue(*clusterService.ServiceName), - ecsLabelServiceARN: model.LabelValue(*clusterService.ServiceArn), - ecsLabelServiceStatus: model.LabelValue(*clusterService.Status), - ecsLabelTaskGroup: model.LabelValue(*task.Group), - ecsLabelTaskARN: model.LabelValue(*task.TaskArn), - ecsLabelTaskDefinition: model.LabelValue(*task.TaskDefinitionArn), - ecsLabelIPAddress: model.LabelValue(ipAddress), - ecsLabelRegion: model.LabelValue(d.cfg.Region), - ecsLabelLaunchType: model.LabelValue(task.LaunchType), - ecsLabelAvailabilityZone: model.LabelValue(*task.AvailabilityZone), - ecsLabelDesiredStatus: model.LabelValue(*task.DesiredStatus), - ecsLabelLastStatus: model.LabelValue(*task.LastStatus), - ecsLabelHealthStatus: model.LabelValue(task.HealthStatus), - ecsLabelNetworkMode: model.LabelValue(networkMode), - } - - // Add subnet ID when available (awsvpc mode from ENI, bridge/host from EC2 instance) - if subnetID != "" { - labels[ecsLabelSubnetID] = model.LabelValue(subnetID) - } - - // Add container instance and EC2 instance info for EC2 launch type - if task.ContainerInstanceArn != nil { - labels[ecsLabelContainerInstanceARN] = model.LabelValue(*task.ContainerInstanceArn) - } - if ec2InstanceID != "" { - labels[ecsLabelEC2InstanceID] = model.LabelValue(ec2InstanceID) - } - if ec2InstanceType != "" { - labels[ecsLabelEC2InstanceType] = model.LabelValue(ec2InstanceType) - } - if ec2InstancePrivateIP != "" { - labels[ecsLabelEC2InstancePrivateIP] = model.LabelValue(ec2InstancePrivateIP) - } - if ec2InstancePublicIP != "" { - labels[ecsLabelEC2InstancePublicIP] = model.LabelValue(ec2InstancePublicIP) - } - if publicIP != "" { - labels[ecsLabelPublicIP] = model.LabelValue(publicIP) - } - - if task.PlatformFamily != nil { - labels[ecsLabelPlatformFamily] = model.LabelValue(*task.PlatformFamily) - } - if task.PlatformVersion != nil { - labels[ecsLabelPlatformVersion] = model.LabelValue(*task.PlatformVersion) - } - - labels[model.AddressLabel] = model.LabelValue(net.JoinHostPort(ipAddress, strconv.Itoa(d.cfg.Port))) - - // Add cluster tags - if cluster, exists := clusterARNMap[*clusterService.ClusterArn]; exists { - if cluster.ClusterName != nil { - labels[ecsLabelCluster] = model.LabelValue(*cluster.ClusterName) - } - - for _, clusterTag := range cluster.Tags { - if clusterTag.Key != nil && clusterTag.Value != nil { - labels[model.LabelName(ecsLabelTagCluster+strutil.SanitizeLabelName(*clusterTag.Key))] = model.LabelValue(*clusterTag.Value) - } - } - } - - // Add service tags - for _, serviceTag := range clusterService.Tags { - if serviceTag.Key != nil && serviceTag.Value != nil { - labels[model.LabelName(ecsLabelTagService+strutil.SanitizeLabelName(*serviceTag.Key))] = model.LabelValue(*serviceTag.Value) - } - } - - // Add task tags - for _, taskTag := range task.Tags { - if taskTag.Key != nil && taskTag.Value != nil { - labels[model.LabelName(ecsLabelTagTask+strutil.SanitizeLabelName(*taskTag.Key))] = model.LabelValue(*taskTag.Value) - } - } - - // Add EC2 instance tags (if running on EC2) - if ec2InstanceID != "" { - if info, ok := ec2InstInfo[ec2InstanceID]; ok { - for tagKey, tagValue := range info.tags { - labels[model.LabelName(ecsLabelTagEC2+strutil.SanitizeLabelName(tagKey))] = model.LabelValue(tagValue) - } - } - } - - serviceTargets = append(serviceTargets, labels) - } - - // Add service targets to local targets with mutex protection - localTargetsMu.Lock() - localTargets = append(localTargets, serviceTargets...) - localTargetsMu.Unlock() - } - }(clusterService) + if err := resourceErrg.Wait(); err != nil { + return } - serviceWg.Wait() + // Fetch container instances and network interfaces in parallel (both depend on tasks) + var ( + containerInstances map[string]string + eniToPublicIP map[string]string + ) - // Add all local targets to main target group with mutex protection - targetsMu.Lock() - tg.Targets = append(tg.Targets, localTargets...) - targetsMu.Unlock() - }(clusterArn, clusterServices) + instanceErrg, instanceCtx := errgroup.WithContext(ctx) + instanceErrg.Go(func() error { + var err error + containerInstances, err = d.describeContainerInstances(instanceCtx, *cluster.ClusterArn, tasks) + if err != nil { + d.logger.Error("Failed to describe container instances for cluster", "cluster", *cluster.ClusterArn, "error", err) + } + return err + }) + instanceErrg.Go(func() error { + var err error + eniToPublicIP, err = d.describeNetworkInterfaces(instanceCtx, tasks) + if err != nil { + d.logger.Error("Failed to describe network interfaces for cluster", "cluster", *cluster.ClusterArn, "error", err) + } + return err + }) + + if err := instanceErrg.Wait(); err != nil { + return + } + + ec2Instances := make(map[string]ec2InstanceInfo) + if len(containerInstances) > 0 { + // Deduplicate EC2 instance IDs (multiple tasks can share the same instance) + ec2InstanceIDSet := make(map[string]struct{}) + for _, ec2ID := range containerInstances { + ec2InstanceIDSet[ec2ID] = struct{}{} + } + ec2InstanceIDs := make([]string, 0, len(ec2InstanceIDSet)) + for ec2ID := range ec2InstanceIDSet { + ec2InstanceIDs = append(ec2InstanceIDs, ec2ID) + } + ec2Instances, err = d.describeEC2Instances(ctx, ec2InstanceIDs) + if err != nil { + d.logger.Error("Failed to describe EC2 instances for cluster", "cluster", *cluster.ClusterArn, "error", err) + return + } + } + + var ( + taskWg sync.WaitGroup + taskMu sync.Mutex + taskTargets []model.LabelSet + ) + + for _, task := range tasks { + taskWg.Add(1) + + go func(cluster types.Cluster, services map[string]types.Service, task types.Task, containerInstances map[string]string, ec2Instances map[string]ec2InstanceInfo, eniToPublicIP map[string]string) { + defer taskWg.Done() + + var ( + ipAddress, subnetID, publicIP string + networkMode string + ec2InstanceID, ec2InstanceType, ec2InstancePrivateIP, ec2InstancePublicIP string + ) + + // Try to get IP from ENI attachment (awsvpc mode) + var eniAttachment *types.Attachment + for _, attachment := range task.Attachments { + if attachment.Type != nil && *attachment.Type == "ElasticNetworkInterface" { + eniAttachment = &attachment + break + } + } + + if eniAttachment != nil { + // awsvpc networking mode - get IP from ENI + networkMode = "awsvpc" + var eniID string + for _, detail := range eniAttachment.Details { + switch *detail.Name { + case "privateIPv4Address": + ipAddress = *detail.Value + case "subnetId": + subnetID = *detail.Value + case "networkInterfaceId": + eniID = *detail.Value + } + } + // Get public IP from ENI if available + if eniID != "" { + if pub, ok := eniToPublicIP[eniID]; ok { + publicIP = pub + } + } + } else if task.ContainerInstanceArn != nil { + // bridge/host networking mode - need to get EC2 instance IP and subnet + networkMode = "bridge" + var ok bool + ec2InstanceID, ok = containerInstances[*task.ContainerInstanceArn] + if ok { + info, ok := ec2Instances[ec2InstanceID] + if ok { + ipAddress = info.privateIP + publicIP = info.publicIP + subnetID = info.subnetID + ec2InstanceType = info.instanceType + ec2InstancePrivateIP = info.privateIP + ec2InstancePublicIP = info.publicIP + } else { + d.logger.Debug("EC2 instance info not found", "instance", ec2InstanceID, "task", *task.TaskArn) + } + } else { + d.logger.Debug("Container instance not found in map", "arn", *task.ContainerInstanceArn, "task", *task.TaskArn) + } + } + + // Get EC2 instance metadata for awsvpc tasks running on EC2 + // We want the instance type and the host IPs for advanced use cases + if networkMode == "awsvpc" && task.ContainerInstanceArn != nil { + var ok bool + ec2InstanceID, ok = containerInstances[*task.ContainerInstanceArn] + if ok { + info, ok := ec2Instances[ec2InstanceID] + if ok { + ec2InstanceType = info.instanceType + ec2InstancePrivateIP = info.privateIP + ec2InstancePublicIP = info.publicIP + } + } + } + + if ipAddress == "" { + return + } + + labels := model.LabelSet{ + ecsLabelClusterARN: model.LabelValue(*cluster.ClusterArn), + ecsLabelCluster: model.LabelValue(*cluster.ClusterName), + ecsLabelTaskGroup: model.LabelValue(*task.Group), + ecsLabelTaskARN: model.LabelValue(*task.TaskArn), + ecsLabelTaskDefinition: model.LabelValue(*task.TaskDefinitionArn), + ecsLabelIPAddress: model.LabelValue(ipAddress), + ecsLabelRegion: model.LabelValue(d.cfg.Region), + ecsLabelLaunchType: model.LabelValue(task.LaunchType), + ecsLabelAvailabilityZone: model.LabelValue(*task.AvailabilityZone), + ecsLabelDesiredStatus: model.LabelValue(*task.DesiredStatus), + ecsLabelLastStatus: model.LabelValue(*task.LastStatus), + ecsLabelHealthStatus: model.LabelValue(task.HealthStatus), + ecsLabelNetworkMode: model.LabelValue(networkMode), + } + + // Add subnet ID when available (awsvpc mode from ENI, bridge/host from EC2 instance) + if subnetID != "" { + labels[ecsLabelSubnetID] = model.LabelValue(subnetID) + } + + // Add container instance and EC2 instance info for EC2 launch type + if task.ContainerInstanceArn != nil { + labels[ecsLabelContainerInstanceARN] = model.LabelValue(*task.ContainerInstanceArn) + } + if ec2InstanceID != "" { + labels[ecsLabelEC2InstanceID] = model.LabelValue(ec2InstanceID) + } + if ec2InstanceType != "" { + labels[ecsLabelEC2InstanceType] = model.LabelValue(ec2InstanceType) + } + if ec2InstancePrivateIP != "" { + labels[ecsLabelEC2InstancePrivateIP] = model.LabelValue(ec2InstancePrivateIP) + } + if ec2InstancePublicIP != "" { + labels[ecsLabelEC2InstancePublicIP] = model.LabelValue(ec2InstancePublicIP) + } + if publicIP != "" { + labels[ecsLabelPublicIP] = model.LabelValue(publicIP) + } + + if task.PlatformFamily != nil { + labels[ecsLabelPlatformFamily] = model.LabelValue(*task.PlatformFamily) + } + if task.PlatformVersion != nil { + labels[ecsLabelPlatformVersion] = model.LabelValue(*task.PlatformVersion) + } + + labels[model.AddressLabel] = model.LabelValue(net.JoinHostPort(ipAddress, strconv.Itoa(d.cfg.Port))) + + // Add cluster tags + for _, clusterTag := range cluster.Tags { + if clusterTag.Key != nil && clusterTag.Value != nil { + labels[model.LabelName(ecsLabelTagCluster+strutil.SanitizeLabelName(*clusterTag.Key))] = model.LabelValue(*clusterTag.Value) + } + } + + // If this is not a standalone task, add service information and tags + if !isStandaloneTask(task) { + service, ok := services[getServiceNameFromTaskGroup(task)] + if !ok { + d.logger.Debug("Service not found for task", "task", *task.TaskArn, "service", getServiceNameFromTaskGroup(task)) + } + if service.ServiceName != nil { + labels[ecsLabelService] = model.LabelValue(*service.ServiceName) + } + if service.ServiceArn != nil { + labels[ecsLabelServiceARN] = model.LabelValue(*service.ServiceArn) + } + if service.Status != nil { + labels[ecsLabelServiceStatus] = model.LabelValue(*service.Status) + } + + // Add service tags + for _, serviceTag := range service.Tags { + if serviceTag.Key != nil && serviceTag.Value != nil { + labels[model.LabelName(ecsLabelTagService+strutil.SanitizeLabelName(*serviceTag.Key))] = model.LabelValue(*serviceTag.Value) + } + } + } + + // Add task tags + for _, taskTag := range task.Tags { + if taskTag.Key != nil && taskTag.Value != nil { + labels[model.LabelName(ecsLabelTagTask+strutil.SanitizeLabelName(*taskTag.Key))] = model.LabelValue(*taskTag.Value) + } + } + + // Add EC2 instance tags (if running on EC2) + if ec2InstanceID != "" { + if info, ok := ec2Instances[ec2InstanceID]; ok { + for tagKey, tagValue := range info.tags { + labels[model.LabelName(ecsLabelTagEC2+strutil.SanitizeLabelName(tagKey))] = model.LabelValue(tagValue) + } + } + } + + taskMu.Lock() + taskTargets = append(taskTargets, labels) + taskMu.Unlock() + }(cluster, services, task, containerInstances, ec2Instances, eniToPublicIP) + } + + taskWg.Wait() + + // Add this cluster's task targets to the overall collection + clusterMu.Lock() + clusterTargets = append(clusterTargets, taskTargets...) + clusterMu.Unlock() + }(clusterMap[clusterARN], serviceMap[clusterARN], taskARNs) } - wg.Wait() + clusterWg.Wait() + + // Set all targets to the target group + tg.Targets = clusterTargets return []*targetgroup.Group{tg}, nil } + +func isStandaloneTask(task types.Task) bool { + // A standalone task will have a group of "family:task-def-name" + return task.Group != nil && strings.HasPrefix(*task.Group, "family:") +} + +func getServiceNameFromTaskGroup(task types.Task) string { + return strings.Split(*task.Group, ":")[1] +} diff --git a/discovery/aws/ecs_test.go b/discovery/aws/ecs_test.go index 1cb48b27fa..bb1f96a28e 100644 --- a/discovery/aws/ecs_test.go +++ b/discovery/aws/ecs_test.go @@ -214,7 +214,6 @@ func TestECSDiscoveryDescribeClusters(t *testing.T) { func TestECSDiscoveryListServiceARNs(t *testing.T) { ctx := context.Background() - // iterate through the test cases for _, tt := range []struct { name string ecsData *ecsDataStore @@ -225,33 +224,18 @@ func TestECSDiscoveryListServiceARNs(t *testing.T) { name: "SingleClusterWithServices", ecsData: &ecsDataStore{ region: "us-west-2", - clusters: []ecsTypes.Cluster{ - { - ClusterName: strptr("test-cluster"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Status: strptr("ACTIVE"), - }, - }, services: []ecsTypes.Service{ { ServiceName: strptr("web-service"), ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service"), ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Status: strptr("RUNNING"), + Status: strptr("ACTIVE"), }, { ServiceName: strptr("api-service"), ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service"), ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Status: strptr("RUNNING"), - }, - { - // this is to test the old arn format without the cluster name in the service arn - // https://docs.aws.amazon.com/AmazonECS/latest/developerguide/service-arn-migration.html - ServiceName: strptr("old-api-service"), - ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/old-api-service"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Status: strptr("RUNNING"), + Status: strptr("ACTIVE"), }, }, }, @@ -260,70 +244,50 @@ func TestECSDiscoveryListServiceARNs(t *testing.T) { "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster": { "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service", "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service", - "arn:aws:ecs:us-west-2:123456789012:service/old-api-service", }, }, }, { - name: "MultipleClustesWithServices", + name: "MultipleClusters", ecsData: &ecsDataStore{ - region: "us-east-1", - clusters: []ecsTypes.Cluster{ - { - ClusterName: strptr("cluster-1"), - ClusterArn: strptr("arn:aws:ecs:us-east-1:123456789012:cluster/cluster-1"), - Status: strptr("ACTIVE"), - }, - { - ClusterName: strptr("cluster-2"), - ClusterArn: strptr("arn:aws:ecs:us-east-1:123456789012:cluster/cluster-2"), - Status: strptr("ACTIVE"), - }, - }, + region: "us-west-2", services: []ecsTypes.Service{ { - ServiceName: strptr("service-1"), - ServiceArn: strptr("arn:aws:ecs:us-east-1:123456789012:service/cluster-1/service-1"), - ClusterArn: strptr("arn:aws:ecs:us-east-1:123456789012:cluster/cluster-1"), - Status: strptr("RUNNING"), + ServiceName: strptr("web-service"), + ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/cluster-1/web-service"), + ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/cluster-1"), + Status: strptr("ACTIVE"), }, { - ServiceName: strptr("service-2"), - ServiceArn: strptr("arn:aws:ecs:us-east-1:123456789012:service/cluster-2/service-2"), - ClusterArn: strptr("arn:aws:ecs:us-east-1:123456789012:cluster/cluster-2"), - Status: strptr("RUNNING"), + ServiceName: strptr("api-service"), + ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/cluster-2/api-service"), + ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/cluster-2"), + Status: strptr("ACTIVE"), }, }, }, clusterARNs: []string{ - "arn:aws:ecs:us-east-1:123456789012:cluster/cluster-1", - "arn:aws:ecs:us-east-1:123456789012:cluster/cluster-2", + "arn:aws:ecs:us-west-2:123456789012:cluster/cluster-1", + "arn:aws:ecs:us-west-2:123456789012:cluster/cluster-2", }, expected: map[string][]string{ - "arn:aws:ecs:us-east-1:123456789012:cluster/cluster-1": { - "arn:aws:ecs:us-east-1:123456789012:service/cluster-1/service-1", + "arn:aws:ecs:us-west-2:123456789012:cluster/cluster-1": { + "arn:aws:ecs:us-west-2:123456789012:service/cluster-1/web-service", }, - "arn:aws:ecs:us-east-1:123456789012:cluster/cluster-2": { - "arn:aws:ecs:us-east-1:123456789012:service/cluster-2/service-2", + "arn:aws:ecs:us-west-2:123456789012:cluster/cluster-2": { + "arn:aws:ecs:us-west-2:123456789012:service/cluster-2/api-service", }, }, }, { - name: "ClusterWithNoServices", + name: "EmptyCluster", ecsData: &ecsDataStore{ - region: "us-west-2", - clusters: []ecsTypes.Cluster{ - { - ClusterName: strptr("empty-cluster"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/empty-cluster"), - Status: strptr("ACTIVE"), - }, - }, + region: "us-west-2", services: []ecsTypes.Service{}, }, - clusterARNs: []string{"arn:aws:ecs:us-west-2:123456789012:cluster/empty-cluster"}, + clusterARNs: []string{"arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"}, expected: map[string][]string{ - "arn:aws:ecs:us-west-2:123456789012:cluster/empty-cluster": nil, + "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster": nil, }, }, } { @@ -334,7 +298,7 @@ func TestECSDiscoveryListServiceARNs(t *testing.T) { ecs: client, cfg: &ECSSDConfig{ Region: tt.ecsData.region, - RequestConcurrency: 1, + RequestConcurrency: 2, }, } @@ -348,113 +312,178 @@ func TestECSDiscoveryListServiceARNs(t *testing.T) { func TestECSDiscoveryDescribeServices(t *testing.T) { ctx := context.Background() - // iterate through the test cases for _, tt := range []struct { - name string - ecsData *ecsDataStore - clusterServiceARNsMap map[string][]string - expected map[string][]ecsTypes.Service + name string + ecsData *ecsDataStore + clusterARN string + serviceARNs []string + expected map[string]ecsTypes.Service }{ { - name: "SingleClusterServices", + name: "ServicesWithTags", ecsData: &ecsDataStore{ region: "us-west-2", services: []ecsTypes.Service{ { - ServiceName: strptr("web-service"), - ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Status: strptr("RUNNING"), - TaskDefinition: strptr("arn:aws:ecs:us-west-2:123456789012:task-definition/web-task:1"), + ServiceName: strptr("web-service"), + ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service"), + ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), + Status: strptr("ACTIVE"), Tags: []ecsTypes.Tag{ {Key: strptr("Environment"), Value: strptr("production")}, + {Key: strptr("Team"), Value: strptr("platform")}, }, }, { - ServiceName: strptr("api-service"), - ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Status: strptr("RUNNING"), - TaskDefinition: strptr("arn:aws:ecs:us-west-2:123456789012:task-definition/api-task:2"), - }, - }, - }, - clusterServiceARNsMap: map[string][]string{ - "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster": { - "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service", - "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service", - }, - }, - expected: map[string][]ecsTypes.Service{ - "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster": { - { - ServiceName: strptr("web-service"), - ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Status: strptr("RUNNING"), - TaskDefinition: strptr("arn:aws:ecs:us-west-2:123456789012:task-definition/web-task:1"), + ServiceName: strptr("api-service"), + ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service"), + ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), + Status: strptr("ACTIVE"), Tags: []ecsTypes.Tag{ - {Key: strptr("Environment"), Value: strptr("production")}, + {Key: strptr("Environment"), Value: strptr("staging")}, }, }, - { - ServiceName: strptr("api-service"), - ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Status: strptr("RUNNING"), - TaskDefinition: strptr("arn:aws:ecs:us-west-2:123456789012:task-definition/api-task:2"), + }, + }, + clusterARN: "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster", + serviceARNs: []string{ + "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service", + "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service", + }, + expected: map[string]ecsTypes.Service{ + "web-service": { + ServiceName: strptr("web-service"), + ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service"), + ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), + Status: strptr("ACTIVE"), + Tags: []ecsTypes.Tag{ + {Key: strptr("Environment"), Value: strptr("production")}, + {Key: strptr("Team"), Value: strptr("platform")}, + }, + }, + "api-service": { + ServiceName: strptr("api-service"), + ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service"), + ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), + Status: strptr("ACTIVE"), + Tags: []ecsTypes.Tag{ + {Key: strptr("Environment"), Value: strptr("staging")}, }, }, }, }, { - name: "MultipleClustersServices", + name: "EmptyServiceList", ecsData: &ecsDataStore{ - region: "us-east-1", - services: []ecsTypes.Service{ + region: "us-west-2", + services: []ecsTypes.Service{}, + }, + clusterARN: "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster", + serviceARNs: []string{}, + expected: map[string]ecsTypes.Service{}, + }, + } { + t.Run(tt.name, func(t *testing.T) { + client := newMockECSClient(tt.ecsData) + + d := &ECSDiscovery{ + ecs: client, + cfg: &ECSSDConfig{ + Region: tt.ecsData.region, + RequestConcurrency: 2, + }, + } + + services, err := d.describeServices(ctx, tt.clusterARN, tt.serviceARNs) + require.NoError(t, err) + require.Equal(t, tt.expected, services) + }) + } +} + +func TestECSDiscoveryDescribeContainerInstances(t *testing.T) { + ctx := context.Background() + + for _, tt := range []struct { + name string + ecsData *ecsDataStore + clusterARN string + tasks []ecsTypes.Task + expected map[string]string + }{ + { + name: "EC2Tasks", + ecsData: &ecsDataStore{ + region: "us-west-2", + containerInstances: []ecsTypes.ContainerInstance{ { - ServiceName: strptr("service-1"), - ServiceArn: strptr("arn:aws:ecs:us-east-1:123456789012:service/cluster-1/service-1"), - ClusterArn: strptr("arn:aws:ecs:us-east-1:123456789012:cluster/cluster-1"), - Status: strptr("RUNNING"), - TaskDefinition: strptr("arn:aws:ecs:us-east-1:123456789012:task-definition/task-1:1"), + ContainerInstanceArn: strptr("arn:aws:ecs:us-west-2:123456789012:container-instance/test-cluster/abc123"), + Ec2InstanceId: strptr("i-1234567890abcdef0"), }, { - ServiceName: strptr("service-2"), - ServiceArn: strptr("arn:aws:ecs:us-east-1:123456789012:service/cluster-2/service-2"), - ClusterArn: strptr("arn:aws:ecs:us-east-1:123456789012:cluster/cluster-2"), - Status: strptr("DRAINING"), - TaskDefinition: strptr("arn:aws:ecs:us-east-1:123456789012:task-definition/task-2:1"), + ContainerInstanceArn: strptr("arn:aws:ecs:us-west-2:123456789012:container-instance/test-cluster/xyz789"), + Ec2InstanceId: strptr("i-0987654321fedcba0"), }, }, }, - clusterServiceARNsMap: map[string][]string{ - "arn:aws:ecs:us-east-1:123456789012:cluster/cluster-1": { - "arn:aws:ecs:us-east-1:123456789012:service/cluster-1/service-1", + clusterARN: "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster", + tasks: []ecsTypes.Task{ + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-1"), + ContainerInstanceArn: strptr("arn:aws:ecs:us-west-2:123456789012:container-instance/test-cluster/abc123"), + LaunchType: ecsTypes.LaunchTypeEc2, }, - "arn:aws:ecs:us-east-1:123456789012:cluster/cluster-2": { - "arn:aws:ecs:us-east-1:123456789012:service/cluster-2/service-2", + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-2"), + ContainerInstanceArn: strptr("arn:aws:ecs:us-west-2:123456789012:container-instance/test-cluster/xyz789"), + LaunchType: ecsTypes.LaunchTypeEc2, }, }, - expected: map[string][]ecsTypes.Service{ - "arn:aws:ecs:us-east-1:123456789012:cluster/cluster-1": { + expected: map[string]string{ + "arn:aws:ecs:us-west-2:123456789012:container-instance/test-cluster/abc123": "i-1234567890abcdef0", + "arn:aws:ecs:us-west-2:123456789012:container-instance/test-cluster/xyz789": "i-0987654321fedcba0", + }, + }, + { + name: "FargateTasks", + ecsData: &ecsDataStore{ + region: "us-west-2", + containerInstances: []ecsTypes.ContainerInstance{}, + }, + clusterARN: "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster", + tasks: []ecsTypes.Task{ + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-1"), + LaunchType: ecsTypes.LaunchTypeFargate, + }, + }, + expected: map[string]string{}, + }, + { + name: "MixedTasks", + ecsData: &ecsDataStore{ + region: "us-west-2", + containerInstances: []ecsTypes.ContainerInstance{ { - ServiceName: strptr("service-1"), - ServiceArn: strptr("arn:aws:ecs:us-east-1:123456789012:service/cluster-1/service-1"), - ClusterArn: strptr("arn:aws:ecs:us-east-1:123456789012:cluster/cluster-1"), - Status: strptr("RUNNING"), - TaskDefinition: strptr("arn:aws:ecs:us-east-1:123456789012:task-definition/task-1:1"), + ContainerInstanceArn: strptr("arn:aws:ecs:us-west-2:123456789012:container-instance/test-cluster/abc123"), + Ec2InstanceId: strptr("i-1234567890abcdef0"), }, }, - "arn:aws:ecs:us-east-1:123456789012:cluster/cluster-2": { - { - ServiceName: strptr("service-2"), - ServiceArn: strptr("arn:aws:ecs:us-east-1:123456789012:service/cluster-2/service-2"), - ClusterArn: strptr("arn:aws:ecs:us-east-1:123456789012:cluster/cluster-2"), - Status: strptr("DRAINING"), - TaskDefinition: strptr("arn:aws:ecs:us-east-1:123456789012:task-definition/task-2:1"), - }, + }, + clusterARN: "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster", + tasks: []ecsTypes.Task{ + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-ec2"), + ContainerInstanceArn: strptr("arn:aws:ecs:us-west-2:123456789012:container-instance/test-cluster/abc123"), + LaunchType: ecsTypes.LaunchTypeEc2, }, + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-fargate"), + LaunchType: ecsTypes.LaunchTypeFargate, + }, + }, + expected: map[string]string{ + "arn:aws:ecs:us-west-2:123456789012:container-instance/test-cluster/abc123": "i-1234567890abcdef0", }, }, } { @@ -465,13 +494,267 @@ func TestECSDiscoveryDescribeServices(t *testing.T) { ecs: client, cfg: &ECSSDConfig{ Region: tt.ecsData.region, - RequestConcurrency: 1, + RequestConcurrency: 2, }, } - serviceMap, err := d.describeServices(ctx, tt.clusterServiceARNsMap) + containerInstances, err := d.describeContainerInstances(ctx, tt.clusterARN, tt.tasks) require.NoError(t, err) - require.Equal(t, tt.expected, serviceMap) + require.Equal(t, tt.expected, containerInstances) + }) + } +} + +func TestECSDiscoveryDescribeEC2Instances(t *testing.T) { + ctx := context.Background() + + for _, tt := range []struct { + name string + ecsData *ecsDataStore + instanceIDs []string + expected map[string]ec2InstanceInfo + }{ + { + name: "InstancesWithTags", + ecsData: &ecsDataStore{ + region: "us-west-2", + ec2Instances: map[string]ec2InstanceInfo{ + "i-1234567890abcdef0": { + privateIP: "10.0.1.50", + publicIP: "54.1.2.3", + subnetID: "subnet-12345", + instanceType: "t3.medium", + tags: map[string]string{ + "Name": "ecs-host-1", + "Environment": "production", + }, + }, + "i-0987654321fedcba0": { + privateIP: "10.0.1.75", + publicIP: "54.2.3.4", + subnetID: "subnet-67890", + instanceType: "t3.large", + tags: map[string]string{ + "Name": "ecs-host-2", + "Team": "platform", + }, + }, + }, + }, + instanceIDs: []string{"i-1234567890abcdef0", "i-0987654321fedcba0"}, + expected: map[string]ec2InstanceInfo{ + "i-1234567890abcdef0": { + privateIP: "10.0.1.50", + publicIP: "54.1.2.3", + subnetID: "subnet-12345", + instanceType: "t3.medium", + tags: map[string]string{ + "Name": "ecs-host-1", + "Environment": "production", + }, + }, + "i-0987654321fedcba0": { + privateIP: "10.0.1.75", + publicIP: "54.2.3.4", + subnetID: "subnet-67890", + instanceType: "t3.large", + tags: map[string]string{ + "Name": "ecs-host-2", + "Team": "platform", + }, + }, + }, + }, + { + name: "EmptyList", + ecsData: &ecsDataStore{ + region: "us-west-2", + ec2Instances: map[string]ec2InstanceInfo{}, + }, + instanceIDs: []string{}, + expected: map[string]ec2InstanceInfo{}, + }, + { + name: "InstanceWithoutPublicIP", + ecsData: &ecsDataStore{ + region: "us-west-2", + ec2Instances: map[string]ec2InstanceInfo{ + "i-privateonly": { + privateIP: "10.0.1.100", + publicIP: "", + subnetID: "subnet-private", + instanceType: "t3.micro", + tags: map[string]string{}, + }, + }, + }, + instanceIDs: []string{"i-privateonly"}, + expected: map[string]ec2InstanceInfo{ + "i-privateonly": { + privateIP: "10.0.1.100", + publicIP: "", + subnetID: "subnet-private", + instanceType: "t3.micro", + tags: map[string]string{}, + }, + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + ec2Client := newMockECSEC2Client(tt.ecsData.ec2Instances, nil) + + d := &ECSDiscovery{ + ec2: ec2Client, + cfg: &ECSSDConfig{ + Region: tt.ecsData.region, + RequestConcurrency: 2, + }, + } + + instances, err := d.describeEC2Instances(ctx, tt.instanceIDs) + require.NoError(t, err) + require.Equal(t, tt.expected, instances) + }) + } +} + +func TestECSDiscoveryDescribeNetworkInterfaces(t *testing.T) { + ctx := context.Background() + + for _, tt := range []struct { + name string + ecsData *ecsDataStore + tasks []ecsTypes.Task + expected map[string]string + }{ + { + name: "AwsvpcTasksWithPublicIPs", + ecsData: &ecsDataStore{ + region: "us-west-2", + eniPublicIPs: map[string]string{ + "eni-12345": "52.1.2.3", + "eni-67890": "52.2.3.4", + }, + }, + tasks: []ecsTypes.Task{ + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-1"), + LaunchType: ecsTypes.LaunchTypeFargate, + Attachments: []ecsTypes.Attachment{ + { + Type: strptr("ElasticNetworkInterface"), + Details: []ecsTypes.KeyValuePair{ + {Name: strptr("networkInterfaceId"), Value: strptr("eni-12345")}, + {Name: strptr("privateIPv4Address"), Value: strptr("10.0.1.100")}, + }, + }, + }, + }, + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-2"), + LaunchType: ecsTypes.LaunchTypeFargate, + Attachments: []ecsTypes.Attachment{ + { + Type: strptr("ElasticNetworkInterface"), + Details: []ecsTypes.KeyValuePair{ + {Name: strptr("networkInterfaceId"), Value: strptr("eni-67890")}, + {Name: strptr("privateIPv4Address"), Value: strptr("10.0.1.200")}, + }, + }, + }, + }, + }, + expected: map[string]string{ + "eni-12345": "52.1.2.3", + "eni-67890": "52.2.3.4", + }, + }, + { + name: "AwsvpcTasksWithoutPublicIPs", + ecsData: &ecsDataStore{ + region: "us-west-2", + eniPublicIPs: map[string]string{}, + }, + tasks: []ecsTypes.Task{ + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-1"), + LaunchType: ecsTypes.LaunchTypeFargate, + Attachments: []ecsTypes.Attachment{ + { + Type: strptr("ElasticNetworkInterface"), + Details: []ecsTypes.KeyValuePair{ + {Name: strptr("networkInterfaceId"), Value: strptr("eni-private")}, + {Name: strptr("privateIPv4Address"), Value: strptr("10.0.1.100")}, + }, + }, + }, + }, + }, + expected: map[string]string{}, + }, + { + name: "BridgeTasksNoENI", + ecsData: &ecsDataStore{ + region: "us-west-2", + eniPublicIPs: map[string]string{}, + }, + tasks: []ecsTypes.Task{ + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-1"), + LaunchType: ecsTypes.LaunchTypeEc2, + // No ENI attachment for bridge networking + Attachments: []ecsTypes.Attachment{}, + }, + }, + expected: map[string]string{}, + }, + { + name: "MixedTasks", + ecsData: &ecsDataStore{ + region: "us-west-2", + eniPublicIPs: map[string]string{ + "eni-fargate": "52.1.2.3", + }, + }, + tasks: []ecsTypes.Task{ + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-fargate"), + LaunchType: ecsTypes.LaunchTypeFargate, + Attachments: []ecsTypes.Attachment{ + { + Type: strptr("ElasticNetworkInterface"), + Details: []ecsTypes.KeyValuePair{ + {Name: strptr("networkInterfaceId"), Value: strptr("eni-fargate")}, + {Name: strptr("privateIPv4Address"), Value: strptr("10.0.1.100")}, + }, + }, + }, + }, + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-bridge"), + LaunchType: ecsTypes.LaunchTypeEc2, + Attachments: []ecsTypes.Attachment{}, + }, + }, + expected: map[string]string{ + "eni-fargate": "52.1.2.3", + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + ec2Client := newMockECSEC2Client(nil, tt.ecsData.eniPublicIPs) + + d := &ECSDiscovery{ + ec2: ec2Client, + cfg: &ECSSDConfig{ + Region: tt.ecsData.region, + RequestConcurrency: 2, + }, + } + + eniMap, err := d.describeNetworkInterfaces(ctx, tt.tasks) + require.NoError(t, err) + require.Equal(t, tt.expected, eniMap) }) } } @@ -481,13 +764,13 @@ func TestECSDiscoveryListTaskARNs(t *testing.T) { // iterate through the test cases for _, tt := range []struct { - name string - ecsData *ecsDataStore - services []ecsTypes.Service - expected map[string][]string + name string + ecsData *ecsDataStore + clusterARNs []string + expected map[string][]string }{ { - name: "ServicesWithTasks", + name: "TasksInCluster", ecsData: &ecsDataStore{ region: "us-west-2", tasks: []ecsTypes.Task{ @@ -511,46 +794,24 @@ func TestECSDiscoveryListTaskARNs(t *testing.T) { }, }, }, - services: []ecsTypes.Service{ - { - ServiceName: strptr("web-service"), - ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Status: strptr("RUNNING"), - }, - { - ServiceName: strptr("api-service"), - ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Status: strptr("RUNNING"), - }, - }, + clusterARNs: []string{"arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"}, expected: map[string][]string{ - "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service": { + "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster": { "arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-1", "arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-2", - }, - "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service": { "arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-3", }, }, }, { - name: "ServiceWithNoTasks", + name: "EmptyCluster", ecsData: &ecsDataStore{ region: "us-west-2", tasks: []ecsTypes.Task{}, }, - services: []ecsTypes.Service{ - { - ServiceName: strptr("empty-service"), - ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/empty-service"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Status: strptr("RUNNING"), - }, - }, + clusterARNs: []string{"arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"}, expected: map[string][]string{ - "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/empty-service": nil, + "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster": nil, }, }, } { @@ -565,7 +826,7 @@ func TestECSDiscoveryListTaskARNs(t *testing.T) { }, } - taskMap, err := d.listTaskARNs(ctx, tt.services) + taskMap, err := d.listTaskARNs(ctx, tt.clusterARNs) require.NoError(t, err) require.Equal(t, tt.expected, taskMap) }) @@ -577,11 +838,11 @@ func TestECSDiscoveryDescribeTasks(t *testing.T) { // iterate through the test cases for _, tt := range []struct { - name string - ecsData *ecsDataStore - clusterARN string - taskARNsMap map[string][]string - expected map[string][]ecsTypes.Task + name string + ecsData *ecsDataStore + clusterARN string + taskARNs []string + expected []ecsTypes.Task }{ { name: "TasksInCluster", @@ -608,47 +869,39 @@ func TestECSDiscoveryDescribeTasks(t *testing.T) { }, }, clusterARN: "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster", - taskARNsMap: map[string][]string{ - "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service": { - "arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-1", - }, - "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service": { - "arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-2", - }, + taskARNs: []string{ + "arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-1", + "arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-2", }, - expected: map[string][]ecsTypes.Task{ - "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service": { - { - TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-1"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Group: strptr("service:web-service"), - TaskDefinitionArn: strptr("arn:aws:ecs:us-west-2:123456789012:task-definition/web-task:1"), - LastStatus: strptr("RUNNING"), - Tags: []ecsTypes.Tag{ - {Key: strptr("Environment"), Value: strptr("production")}, - }, + expected: []ecsTypes.Task{ + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-1"), + ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), + Group: strptr("service:web-service"), + TaskDefinitionArn: strptr("arn:aws:ecs:us-west-2:123456789012:task-definition/web-task:1"), + LastStatus: strptr("RUNNING"), + Tags: []ecsTypes.Tag{ + {Key: strptr("Environment"), Value: strptr("production")}, }, }, - "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service": { - { - TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-2"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Group: strptr("service:api-service"), - TaskDefinitionArn: strptr("arn:aws:ecs:us-west-2:123456789012:task-definition/api-task:2"), - LastStatus: strptr("RUNNING"), - }, + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-2"), + ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), + Group: strptr("service:api-service"), + TaskDefinitionArn: strptr("arn:aws:ecs:us-west-2:123456789012:task-definition/api-task:2"), + LastStatus: strptr("RUNNING"), }, }, }, { - name: "EmptyTaskARNsMap", + name: "EmptyTaskList", ecsData: &ecsDataStore{ region: "us-west-2", tasks: []ecsTypes.Task{}, }, - clusterARN: "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster", - taskARNsMap: map[string][]string{}, - expected: map[string][]ecsTypes.Task{}, + clusterARN: "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster", + taskARNs: []string{}, + expected: nil, }, } { t.Run(tt.name, func(t *testing.T) { @@ -662,9 +915,9 @@ func TestECSDiscoveryDescribeTasks(t *testing.T) { }, } - taskMap, err := d.describeTasks(ctx, tt.clusterARN, tt.taskARNsMap) + tasks, err := d.describeTasks(ctx, tt.clusterARN, tt.taskARNs) require.NoError(t, err) - require.Equal(t, tt.expected, taskMap) + require.Equal(t, tt.expected, tasks) }) } } @@ -836,6 +1089,75 @@ func TestECSDiscoveryRefresh(t *testing.T) { }, }, }, + { + name: "StandaloneTaskNoService", + ecsData: &ecsDataStore{ + region: "us-west-2", + clusters: []ecsTypes.Cluster{ + { + ClusterName: strptr("standalone-cluster"), + ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/standalone-cluster"), + Status: strptr("ACTIVE"), + }, + }, + services: []ecsTypes.Service{}, + tasks: []ecsTypes.Task{ + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/standalone-cluster/task-standalone"), + ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/standalone-cluster"), + TaskDefinitionArn: strptr("arn:aws:ecs:us-west-2:123456789012:task-definition/standalone-task:1"), + Group: strptr("family:standalone-task"), + LaunchType: ecsTypes.LaunchTypeFargate, + LastStatus: strptr("RUNNING"), + DesiredStatus: strptr("RUNNING"), + HealthStatus: ecsTypes.HealthStatusHealthy, + AvailabilityZone: strptr("us-west-2a"), + Attachments: []ecsTypes.Attachment{ + { + Type: strptr("ElasticNetworkInterface"), + Details: []ecsTypes.KeyValuePair{ + {Name: strptr("subnetId"), Value: strptr("subnet-standalone-1")}, + {Name: strptr("privateIPv4Address"), Value: strptr("10.0.4.10")}, + {Name: strptr("networkInterfaceId"), Value: strptr("eni-standalone-123")}, + }, + }, + }, + Tags: []ecsTypes.Tag{ + {Key: strptr("Role"), Value: strptr("batch")}, + }, + }, + }, + eniPublicIPs: map[string]string{ + "eni-standalone-123": "52.4.5.6", + }, + }, + expected: []*targetgroup.Group{ + { + Source: "us-west-2", + Targets: []model.LabelSet{ + { + model.AddressLabel: model.LabelValue("10.0.4.10:80"), + "__meta_ecs_cluster": model.LabelValue("standalone-cluster"), + "__meta_ecs_cluster_arn": model.LabelValue("arn:aws:ecs:us-west-2:123456789012:cluster/standalone-cluster"), + "__meta_ecs_task_group": model.LabelValue("family:standalone-task"), + "__meta_ecs_task_arn": model.LabelValue("arn:aws:ecs:us-west-2:123456789012:task/standalone-cluster/task-standalone"), + "__meta_ecs_task_definition": model.LabelValue("arn:aws:ecs:us-west-2:123456789012:task-definition/standalone-task:1"), + "__meta_ecs_region": model.LabelValue("us-west-2"), + "__meta_ecs_availability_zone": model.LabelValue("us-west-2a"), + "__meta_ecs_subnet_id": model.LabelValue("subnet-standalone-1"), + "__meta_ecs_ip_address": model.LabelValue("10.0.4.10"), + "__meta_ecs_launch_type": model.LabelValue("FARGATE"), + "__meta_ecs_desired_status": model.LabelValue("RUNNING"), + "__meta_ecs_last_status": model.LabelValue("RUNNING"), + "__meta_ecs_health_status": model.LabelValue("HEALTHY"), + "__meta_ecs_network_mode": model.LabelValue("awsvpc"), + "__meta_ecs_public_ip": model.LabelValue("52.4.5.6"), + "__meta_ecs_tag_task_Role": model.LabelValue("batch"), + }, + }, + }, + }, + }, { name: "TaskWithBridgeNetworking", ecsData: &ecsDataStore{ @@ -1184,7 +1506,14 @@ func TestECSDiscoveryRefresh(t *testing.T) { groups, err := d.refresh(ctx) require.NoError(t, err) - require.Equal(t, tt.expected, groups) + if tt.name == "MixedNetworkingModes" { + // Use ElementsMatch for tests with multiple tasks as goroutines can affect order + require.Len(t, groups, len(tt.expected)) + require.Equal(t, tt.expected[0].Source, groups[0].Source) + require.ElementsMatch(t, tt.expected[0].Targets, groups[0].Targets) + } else { + require.Equal(t, tt.expected, groups) + } }) } } @@ -1381,3 +1710,98 @@ func (m *mockECSEC2Client) DescribeNetworkInterfaces(_ context.Context, input *e NetworkInterfaces: networkInterfaces, }, nil } + +func TestIsStandaloneTask(t *testing.T) { + tests := []struct { + name string + task ecsTypes.Task + expected bool + }{ + { + name: "StandaloneTask", + task: ecsTypes.Task{ + Group: strptr("family:my-task-definition"), + }, + expected: true, + }, + { + name: "ServiceTask", + task: ecsTypes.Task{ + Group: strptr("service:my-service"), + }, + expected: false, + }, + { + name: "ServiceTaskWithColon", + task: ecsTypes.Task{ + Group: strptr("service:my:service:name"), + }, + expected: false, + }, + { + name: "NilGroup", + task: ecsTypes.Task{ + Group: nil, + }, + expected: false, + }, + { + name: "EmptyGroup", + task: ecsTypes.Task{ + Group: strptr(""), + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := isStandaloneTask(tt.task) + require.Equal(t, tt.expected, result) + }) + } +} + +func TestGetServiceNameFromTaskGroup(t *testing.T) { + tests := []struct { + name string + task ecsTypes.Task + expected string + }{ + { + name: "SimpleServiceName", + task: ecsTypes.Task{ + Group: strptr("service:my-service"), + }, + expected: "my-service", + }, + { + name: "ServiceNameWithHyphens", + task: ecsTypes.Task{ + Group: strptr("service:web-api-service"), + }, + expected: "web-api-service", + }, + { + name: "ServiceNameWithColons", + task: ecsTypes.Task{ + Group: strptr("service:my:service:name"), + }, + expected: "my", + }, + { + name: "FamilyGroup", + task: ecsTypes.Task{ + Group: strptr("family:my-task-def"), + }, + expected: "my-task-def", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := getServiceNameFromTaskGroup(tt.task) + require.Equal(t, tt.expected, result) + }) + } +}