From 5329355ffb552a027fe5ddd0c002f890d4d345cf Mon Sep 17 00:00:00 2001 From: matt-gp Date: Fri, 6 Feb 2026 10:12:48 +0000 Subject: [PATCH] AWS SD: ECS Standalone Tasks The current ECS role in AWS SD assumes that a task is part of a service. This means that tasks that are started as part of AWS Batch will get missed and not be discovered. This changed fixes this so that standalone tasks can be discovered as well. Signed-off-by: matt-gp --- discovery/aws/ecs.go | 993 ++++++++++++++++++++------------------ discovery/aws/ecs_test.go | 836 ++++++++++++++++++++++++-------- 2 files changed, 1162 insertions(+), 667 deletions(-) diff --git a/discovery/aws/ecs.go b/discovery/aws/ecs.go index 1d5ff366de..9ecfcc44fe 100644 --- a/discovery/aws/ecs.go +++ b/discovery/aws/ecs.go @@ -19,7 +19,9 @@ import ( "fmt" "log/slog" "net" + "slices" "strconv" + "strings" "sync" "time" @@ -273,7 +275,6 @@ func (d *ECSDiscovery) initEcsClient(ctx context.Context) error { // listClusterARNs returns a slice of cluster arns. // This method does not use concurrency as it's a simple paginated call. -// AWS ECS Cluster read actions have burst=50, sustained=20 req/sec limits. func (d *ECSDiscovery) listClusterARNs(ctx context.Context) ([]string, error) { var ( clusterARNs []string @@ -281,7 +282,8 @@ func (d *ECSDiscovery) listClusterARNs(ctx context.Context) ([]string, error) { ) for { resp, err := d.ecs.ListClusters(ctx, &ecs.ListClustersInput{ - NextToken: nextToken, + NextToken: nextToken, + MaxResults: aws.Int32(100), }) if err != nil { return nil, fmt.Errorf("could not list clusters: %w", err) @@ -299,56 +301,61 @@ func (d *ECSDiscovery) listClusterARNs(ctx context.Context) ([]string, error) { } // describeClusters returns a map of cluster ARN to a slice of clusters. -// This method processes clusters in batches without concurrency as it's typically -// a single call handling up to 100 clusters. AWS ECS Cluster read actions have -// burst=50, sustained=20 req/sec limits. +// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling. +// Clusters are described in batches of 100 to respect AWS API limits (DescribeClusters allows up to 100 clusters per call). func (d *ECSDiscovery) describeClusters(ctx context.Context, clusters []string) (map[string]types.Cluster, error) { + mu := sync.Mutex{} clusterMap := make(map[string]types.Cluster) - - // AWS DescribeClusters can handle up to 100 clusters per call - batchSize := 100 - for _, batch := range batchSlice(clusters, batchSize) { - resp, err := d.ecs.DescribeClusters(ctx, &ecs.DescribeClustersInput{ - Clusters: batch, - Include: []types.ClusterField{"TAGS"}, - }) - if err != nil { - d.logger.Error("Failed to describe clusters", "clusters", batch, "error", err) - return nil, fmt.Errorf("could not describe clusters %v: %w", batch, err) - } - - for _, c := range resp.Clusters { - if c.ClusterArn != nil { - clusterMap[*c.ClusterArn] = c + errg, ectx := errgroup.WithContext(ctx) + errg.SetLimit(d.cfg.RequestConcurrency) + for batch := range slices.Chunk(clusters, 100) { + errg.Go(func() error { + resp, err := d.ecs.DescribeClusters(ectx, &ecs.DescribeClustersInput{ + Clusters: batch, + Include: []types.ClusterField{"TAGS"}, + }) + if err != nil { + d.logger.Error("Failed to describe clusters", "clusters", batch, "error", err) + return fmt.Errorf("could not describe clusters %v: %w", batch, err) } - } + + for _, cluster := range resp.Clusters { + if cluster.ClusterArn != nil { + mu.Lock() + clusterMap[*cluster.ClusterArn] = cluster + mu.Unlock() + } + } + return nil + }) } - return clusterMap, nil + return clusterMap, errg.Wait() } // listServiceARNs returns a map of cluster ARN to a slice of service ARNs. // Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling. -// AWS ECS Service read actions have burst=100, sustained=20 req/sec limits. +// Services are listed in batches of 100 to respect AWS API limits (ListServices allows up to 100 services per call). func (d *ECSDiscovery) listServiceARNs(ctx context.Context, clusters []string) (map[string][]string, error) { - serviceARNsMu := sync.Mutex{} - serviceARNs := make(map[string][]string) + mu := sync.Mutex{} + services := make(map[string][]string) errg, ectx := errgroup.WithContext(ctx) errg.SetLimit(d.cfg.RequestConcurrency) for _, clusterARN := range clusters { errg.Go(func() error { var nextToken *string - var clusterServiceARNs []string + var serviceARNs []string for { resp, err := d.ecs.ListServices(ectx, &ecs.ListServicesInput{ - Cluster: aws.String(clusterARN), - NextToken: nextToken, + Cluster: aws.String(clusterARN), + NextToken: nextToken, + MaxResults: aws.Int32(100), }) if err != nil { return fmt.Errorf("could not list services for cluster %q: %w", clusterARN, err) } - clusterServiceARNs = append(clusterServiceARNs, resp.ServiceArns...) + serviceARNs = append(serviceARNs, resp.ServiceArns...) if resp.NextToken == nil { break @@ -356,75 +363,76 @@ func (d *ECSDiscovery) listServiceARNs(ctx context.Context, clusters []string) ( nextToken = resp.NextToken } - serviceARNsMu.Lock() - serviceARNs[clusterARN] = clusterServiceARNs - serviceARNsMu.Unlock() + mu.Lock() + services[clusterARN] = serviceARNs + mu.Unlock() return nil }) } - return serviceARNs, errg.Wait() -} - -// describeServices returns a map of cluster ARN to services. -// Uses concurrent requests with batching (10 services per request) to respect AWS API limits. -// AWS ECS Service read actions have burst=100, sustained=20 req/sec limits. -func (d *ECSDiscovery) describeServices(ctx context.Context, clusterServiceARNsMap map[string][]string) (map[string][]types.Service, error) { - batchSize := 10 // AWS DescribeServices API limit is 10 services per request - serviceMu := sync.Mutex{} - services := make(map[string][]types.Service) - errg, ectx := errgroup.WithContext(ctx) - errg.SetLimit(d.cfg.RequestConcurrency) - for clusterARN, serviceARNs := range clusterServiceARNsMap { - for _, batch := range batchSlice(serviceARNs, batchSize) { - errg.Go(func() error { - resp, err := d.ecs.DescribeServices(ectx, &ecs.DescribeServicesInput{ - Services: batch, - Cluster: aws.String(clusterARN), - Include: []types.ServiceField{"TAGS"}, - }) - if err != nil { - d.logger.Error("Failed to describe services", "cluster", clusterARN, "batch", batch, "error", err) - return fmt.Errorf("could not describe services for cluster %q: %w", clusterARN, err) - } - - serviceMu.Lock() - services[clusterARN] = append(services[clusterARN], resp.Services...) - serviceMu.Unlock() - - return nil - }) - } - } - return services, errg.Wait() } -// listTaskARNs returns a map of service ARN to a slice of task ARNs. +// describeServices returns a map of service name to service. // Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling. -// AWS ECS Cluster resource read actions have burst=100, sustained=20 req/sec limits. -func (d *ECSDiscovery) listTaskARNs(ctx context.Context, services []types.Service) (map[string][]string, error) { - taskARNsMu := sync.Mutex{} - taskARNs := make(map[string][]string) +// Services are described in batches of 10 to respect AWS API limits (DescribeServices allows up to 10 services per call). +func (d *ECSDiscovery) describeServices(ctx context.Context, clusterARN string, serviceARNS []string) (map[string]types.Service, error) { + mu := sync.Mutex{} + services := make(map[string]types.Service) errg, ectx := errgroup.WithContext(ctx) errg.SetLimit(d.cfg.RequestConcurrency) - for _, service := range services { + for batch := range slices.Chunk(serviceARNS, 10) { errg.Go(func() error { - serviceArn := aws.ToString(service.ServiceArn) + resp, err := d.ecs.DescribeServices(ectx, &ecs.DescribeServicesInput{ + Cluster: aws.String(clusterARN), + Services: batch, + Include: []types.ServiceField{"TAGS"}, + }) + if err != nil { + d.logger.Error("Failed to describe services", "cluster", clusterARN, "batch", batch, "error", err) + return fmt.Errorf("could not describe services for cluster %q: batch %v: %w", clusterARN, batch, err) + } - var nextToken *string - var serviceTaskARNs []string + for _, service := range resp.Services { + if service.ServiceArn != nil { + mu.Lock() + services[*service.ServiceName] = service + mu.Unlock() + } + } + return nil + }) + } + + return services, errg.Wait() +} + +// listTaskARNs returns a map of clustersARN to a slice of task ARNs. +// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling. +// Tasks are listed in batches of 100 to respect AWS API limits (ListTasks allows up to 100 tasks per call). +// This method also uses pagination to handle cases where there are more than 100 tasks in a cluster. +func (d *ECSDiscovery) listTaskARNs(ctx context.Context, clusterARNs []string) (map[string][]string, error) { + mu := sync.Mutex{} + tasks := make(map[string][]string) + errg, ectx := errgroup.WithContext(ctx) + errg.SetLimit(d.cfg.RequestConcurrency) + for _, clusterARN := range clusterARNs { + errg.Go(func() error { + var ( + nextToken *string + taskARNs []string + ) for { resp, err := d.ecs.ListTasks(ectx, &ecs.ListTasksInput{ - Cluster: aws.String(*service.ClusterArn), - ServiceName: aws.String(*service.ServiceName), - NextToken: nextToken, + Cluster: aws.String(clusterARN), + NextToken: nextToken, + MaxResults: aws.Int32(100), }) if err != nil { - return fmt.Errorf("could not list tasks for service %q: %w", serviceArn, err) + return fmt.Errorf("could not list tasks for cluster %q: %w", clusterARN, err) } - serviceTaskARNs = append(serviceTaskARNs, resp.TaskArns...) + taskARNs = append(taskARNs, resp.TaskArns...) if resp.NextToken == nil { break @@ -432,77 +440,87 @@ func (d *ECSDiscovery) listTaskARNs(ctx context.Context, services []types.Servic nextToken = resp.NextToken } - taskARNsMu.Lock() - taskARNs[serviceArn] = serviceTaskARNs - taskARNsMu.Unlock() + mu.Lock() + tasks[clusterARN] = taskARNs + mu.Unlock() return nil }) } - return taskARNs, errg.Wait() + return tasks, errg.Wait() } -// describeTasks returns a map of task arn to a slice task. -// Uses concurrent requests with batching (100 tasks per request) to respect AWS API limits. -// AWS ECS Cluster resource read actions have burst=100, sustained=20 req/sec limits. -func (d *ECSDiscovery) describeTasks(ctx context.Context, clusterARN string, taskARNsMap map[string][]string) (map[string][]types.Task, error) { - batchSize := 100 // AWS DescribeTasks API limit is 100 tasks per request - taskMu := sync.Mutex{} - tasks := make(map[string][]types.Task) +// describeTasks returns a slice of tasks. +// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling. +// Tasks are described in batches of 100 to respect AWS API limits (DescribeTasks allows up to 100 tasks per call). +func (d *ECSDiscovery) describeTasks(ctx context.Context, clusterARN string, taskARNs []string) ([]types.Task, error) { + mu := sync.Mutex{} + var tasks []types.Task errg, ectx := errgroup.WithContext(ctx) errg.SetLimit(d.cfg.RequestConcurrency) - for serviceARN, taskARNs := range taskARNsMap { - for _, batch := range batchSlice(taskARNs, batchSize) { - errg.Go(func() error { - resp, err := d.ecs.DescribeTasks(ectx, &ecs.DescribeTasksInput{ - Cluster: aws.String(clusterARN), - Tasks: batch, - Include: []types.TaskField{"TAGS"}, - }) - if err != nil { - d.logger.Error("Failed to describe tasks", "service", serviceARN, "cluster", clusterARN, "batch", batch, "error", err) - return fmt.Errorf("could not describe tasks for service %q in cluster %q: %w", serviceARN, clusterARN, err) - } - - taskMu.Lock() - tasks[serviceARN] = append(tasks[serviceARN], resp.Tasks...) - taskMu.Unlock() - - return nil + for batch := range slices.Chunk(taskARNs, 100) { + errg.Go(func() error { + resp, err := d.ecs.DescribeTasks(ectx, &ecs.DescribeTasksInput{ + Cluster: aws.String(clusterARN), + Tasks: batch, + Include: []types.TaskField{"TAGS"}, }) - } + if err != nil { + d.logger.Error("Failed to describe tasks", "cluster", clusterARN, "batch", batch, "error", err) + return fmt.Errorf("could not describe tasks in cluster %q: batch %v: %w", clusterARN, batch, err) + } + + mu.Lock() + tasks = append(tasks, resp.Tasks...) + mu.Unlock() + return nil + }) } return tasks, errg.Wait() } // describeContainerInstances returns a map of container instance ARN to EC2 instance ID -// Uses batching to respect AWS API limits (100 container instances per request). -func (d *ECSDiscovery) describeContainerInstances(ctx context.Context, clusterARN string, containerInstanceARNs []string) (map[string]string, error) { +// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling. +// Container instances are described in batches of 100 to respect AWS API limits (DescribeContainerInstances allows up to 100 container instances per call). +func (d *ECSDiscovery) describeContainerInstances(ctx context.Context, clusterARN string, tasks []types.Task) (map[string]string, error) { + containerInstanceARNs := make([]string, 0, len(tasks)) + for _, task := range tasks { + if task.ContainerInstanceArn != nil { + containerInstanceARNs = append(containerInstanceARNs, *task.ContainerInstanceArn) + } + } + if len(containerInstanceARNs) == 0 { return make(map[string]string), nil } + mu := sync.Mutex{} containerInstToEC2 := make(map[string]string) - batchSize := 100 // AWS API limit - - for _, batch := range batchSlice(containerInstanceARNs, batchSize) { - resp, err := d.ecs.DescribeContainerInstances(ctx, &ecs.DescribeContainerInstancesInput{ - Cluster: aws.String(clusterARN), - ContainerInstances: batch, - }) - if err != nil { - return nil, fmt.Errorf("could not describe container instances: %w", err) - } - - for _, ci := range resp.ContainerInstances { - if ci.ContainerInstanceArn != nil && ci.Ec2InstanceId != nil { - containerInstToEC2[*ci.ContainerInstanceArn] = *ci.Ec2InstanceId + errg, ectx := errgroup.WithContext(ctx) + errg.SetLimit(d.cfg.RequestConcurrency) + for batch := range slices.Chunk(containerInstanceARNs, 100) { + errg.Go(func() error { + resp, err := d.ecs.DescribeContainerInstances(ectx, &ecs.DescribeContainerInstancesInput{ + Cluster: aws.String(clusterARN), + ContainerInstances: batch, + }) + if err != nil { + return fmt.Errorf("could not describe container instances: %w", err) } - } + + for _, ci := range resp.ContainerInstances { + if ci.ContainerInstanceArn != nil && ci.Ec2InstanceId != nil { + mu.Lock() + containerInstToEC2[*ci.ContainerInstanceArn] = *ci.Ec2InstanceId + mu.Unlock() + } + } + return nil + }) } - return containerInstToEC2, nil + return containerInstToEC2, errg.Wait() } // ec2InstanceInfo holds information retrieved from EC2 DescribeInstances. @@ -515,83 +533,112 @@ type ec2InstanceInfo struct { } // describeEC2Instances returns a map of EC2 instance ID to instance information. +// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling. +// This method does not use concurrency as it's a simple paginated call. func (d *ECSDiscovery) describeEC2Instances(ctx context.Context, instanceIDs []string) (map[string]ec2InstanceInfo, error) { if len(instanceIDs) == 0 { return make(map[string]ec2InstanceInfo), nil } instanceInfo := make(map[string]ec2InstanceInfo) + var nextToken *string - resp, err := d.ec2.DescribeInstances(ctx, &ec2.DescribeInstancesInput{ - InstanceIds: instanceIDs, - }) - if err != nil { - return nil, fmt.Errorf("could not describe EC2 instances: %w", err) - } + for { + resp, err := d.ec2.DescribeInstances(ctx, &ec2.DescribeInstancesInput{ + InstanceIds: instanceIDs, + NextToken: nextToken, + }) + if err != nil { + return nil, fmt.Errorf("could not describe EC2 instances: %w", err) + } - for _, reservation := range resp.Reservations { - for _, instance := range reservation.Instances { - if instance.InstanceId != nil && instance.PrivateIpAddress != nil { - info := ec2InstanceInfo{ - privateIP: *instance.PrivateIpAddress, - tags: make(map[string]string), - } - if instance.PublicIpAddress != nil { - info.publicIP = *instance.PublicIpAddress - } - if instance.SubnetId != nil { - info.subnetID = *instance.SubnetId - } - if instance.InstanceType != "" { - info.instanceType = string(instance.InstanceType) - } - // Collect EC2 instance tags - for _, tag := range instance.Tags { - if tag.Key != nil && tag.Value != nil { - info.tags[*tag.Key] = *tag.Value + for _, reservation := range resp.Reservations { + for _, instance := range reservation.Instances { + if instance.InstanceId != nil && instance.PrivateIpAddress != nil { + info := ec2InstanceInfo{ + privateIP: *instance.PrivateIpAddress, + tags: make(map[string]string), } + if instance.PublicIpAddress != nil { + info.publicIP = *instance.PublicIpAddress + } + if instance.SubnetId != nil { + info.subnetID = *instance.SubnetId + } + if instance.InstanceType != "" { + info.instanceType = string(instance.InstanceType) + } + // Collect EC2 instance tags + for _, tag := range instance.Tags { + if tag.Key != nil && tag.Value != nil { + info.tags[*tag.Key] = *tag.Value + } + } + instanceInfo[*instance.InstanceId] = info } - instanceInfo[*instance.InstanceId] = info } } + + if resp.NextToken == nil { + break + } + nextToken = resp.NextToken } return instanceInfo, nil } // describeNetworkInterfaces returns a map of ENI ID to public IP address. -func (d *ECSDiscovery) describeNetworkInterfaces(ctx context.Context, eniIDs []string) (map[string]string, error) { +// This is needed to get the public IP for tasks using awsvpc network mode, as the ENI is what gets the public IP, not the EC2 instance. +// This method does not use concurrency as it's a simple paginated call. +func (d *ECSDiscovery) describeNetworkInterfaces(ctx context.Context, tasks []types.Task) (map[string]string, error) { + eniIDs := make([]string, 0, len(tasks)) + + for _, task := range tasks { + for _, attachment := range task.Attachments { + if attachment.Type != nil && *attachment.Type == "ElasticNetworkInterface" { + for _, detail := range attachment.Details { + if detail.Name != nil && *detail.Name == "networkInterfaceId" && detail.Value != nil { + eniIDs = append(eniIDs, *detail.Value) + break + } + } + break + } + } + } + if len(eniIDs) == 0 { return make(map[string]string), nil } eniToPublicIP := make(map[string]string) + var nextToken *string - resp, err := d.ec2.DescribeNetworkInterfaces(ctx, &ec2.DescribeNetworkInterfacesInput{ - NetworkInterfaceIds: eniIDs, - }) - if err != nil { - return nil, fmt.Errorf("could not describe network interfaces: %w", err) - } - - for _, eni := range resp.NetworkInterfaces { - if eni.NetworkInterfaceId != nil && eni.Association != nil && eni.Association.PublicIp != nil { - eniToPublicIP[*eni.NetworkInterfaceId] = *eni.Association.PublicIp + for { + resp, err := d.ec2.DescribeNetworkInterfaces(ctx, &ec2.DescribeNetworkInterfacesInput{ + NetworkInterfaceIds: eniIDs, + NextToken: nextToken, + }) + if err != nil { + return nil, fmt.Errorf("could not describe network interfaces: %w", err) } + + for _, eni := range resp.NetworkInterfaces { + if eni.NetworkInterfaceId != nil && eni.Association != nil && eni.Association.PublicIp != nil { + eniToPublicIP[*eni.NetworkInterfaceId] = *eni.Association.PublicIp + } + } + + if resp.NextToken == nil { + break + } + nextToken = resp.NextToken } return eniToPublicIP, nil } -func batchSlice[T any](a []T, size int) [][]T { - batches := make([][]T, 0, len(a)/size+1) - for i := 0; i < len(a); i += size { - end := min(i+size, len(a)) - batches = append(batches, a[i:end]) - } - return batches -} - func (d *ECSDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { err := d.initEcsClient(ctx) if err != nil { @@ -620,314 +667,338 @@ func (d *ECSDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error Source: d.cfg.Region, } - clusterARNMap, err := d.describeClusters(ctx, clusters) - if err != nil { - return nil, err - } + // Fetch cluster details, service ARNs, and task ARNs in parallel + var ( + clusterMap map[string]types.Cluster + serviceMap map[string][]string + taskMap map[string][]string + ) - clusterServiceARNMap, err := d.listServiceARNs(ctx, clusters) - if err != nil { - return nil, err - } + clusterErrg, clusterCtx := errgroup.WithContext(ctx) + clusterErrg.Go(func() error { + var err error + clusterMap, err = d.describeClusters(clusterCtx, clusters) + return err + }) + clusterErrg.Go(func() error { + var err error + serviceMap, err = d.listServiceARNs(clusterCtx, clusters) + return err + }) + clusterErrg.Go(func() error { + var err error + taskMap, err = d.listTaskARNs(clusterCtx, clusters) + return err + }) - clusterServicesMap, err := d.describeServices(ctx, clusterServiceARNMap) - if err != nil { + if err := clusterErrg.Wait(); err != nil { return nil, err } // Use goroutines to process clusters in parallel var ( - targetsMu sync.Mutex - wg sync.WaitGroup + clusterWg sync.WaitGroup + clusterMu sync.Mutex + clusterTargets []model.LabelSet ) - for clusterArn, clusterServices := range clusterServicesMap { - if len(clusterServices) == 0 { + for clusterARN, taskARNs := range taskMap { + if len(taskARNs) == 0 { continue } - wg.Add(1) - go func(clusterArn string, clusterServices []types.Service) { - defer wg.Done() + clusterWg.Add(1) - serviceTaskARNMap, err := d.listTaskARNs(ctx, clusterServices) - if err != nil { - d.logger.Error("Failed to list task ARNs for cluster", "cluster", clusterArn, "error", err) - return - } + go func(cluster types.Cluster, serviceARNs, taskARNs []string) { + defer clusterWg.Done() - serviceTaskMap, err := d.describeTasks(ctx, clusterArn, serviceTaskARNMap) - if err != nil { - d.logger.Error("Failed to describe tasks for cluster", "cluster", clusterArn, "error", err) - return - } - - // Process services within this cluster in parallel + // Fetch services and tasks in parallel (they're independent) var ( - serviceWg sync.WaitGroup - localTargets []model.LabelSet - localTargetsMu sync.Mutex + services map[string]types.Service + tasks []types.Task ) - for _, clusterService := range clusterServices { - serviceWg.Add(1) - go func(clusterService types.Service) { - defer serviceWg.Done() + resourceErrg, resourceCtx := errgroup.WithContext(ctx) + resourceErrg.Go(func() error { + var err error + services, err = d.describeServices(resourceCtx, *cluster.ClusterArn, serviceARNs) + if err != nil { + d.logger.Error("Failed to describe services for cluster", "cluster", *cluster.ClusterArn, "error", err) + } + return err + }) + resourceErrg.Go(func() error { + var err error + tasks, err = d.describeTasks(resourceCtx, *cluster.ClusterArn, taskARNs) + if err != nil { + d.logger.Error("Failed to describe tasks for cluster", "cluster", *cluster.ClusterArn, "error", err) + } + return err + }) - serviceArn := *clusterService.ServiceArn - - if tasks, exists := serviceTaskMap[serviceArn]; exists { - var serviceTargets []model.LabelSet - - // Collect container instance ARNs for all EC2 tasks to get instance type - var containerInstanceARNs []string - taskToContainerInstance := make(map[string]string) - // Collect ENI IDs for awsvpc tasks to get public IPs - var eniIDs []string - taskToENI := make(map[string]string) - - for _, task := range tasks { - // Collect container instance ARN for any task running on EC2 - if task.ContainerInstanceArn != nil { - containerInstanceARNs = append(containerInstanceARNs, *task.ContainerInstanceArn) - taskToContainerInstance[*task.TaskArn] = *task.ContainerInstanceArn - } - - // Collect ENI IDs from awsvpc tasks - for _, attachment := range task.Attachments { - if attachment.Type != nil && *attachment.Type == "ElasticNetworkInterface" { - for _, detail := range attachment.Details { - if detail.Name != nil && *detail.Name == "networkInterfaceId" && detail.Value != nil { - eniIDs = append(eniIDs, *detail.Value) - taskToENI[*task.TaskArn] = *detail.Value - break - } - } - break - } - } - } - - // Batch describe container instances and EC2 instances to get instance type and other metadata - var containerInstToEC2 map[string]string - var ec2InstInfo map[string]ec2InstanceInfo - if len(containerInstanceARNs) > 0 { - var err error - containerInstToEC2, err = d.describeContainerInstances(ctx, clusterArn, containerInstanceARNs) - if err != nil { - d.logger.Error("Failed to describe container instances", "cluster", clusterArn, "error", err) - // Continue processing tasks - } else { - // Collect unique EC2 instance IDs - ec2InstanceIDs := make([]string, 0, len(containerInstToEC2)) - for _, ec2ID := range containerInstToEC2 { - ec2InstanceIDs = append(ec2InstanceIDs, ec2ID) - } - - // Batch describe EC2 instances - ec2InstInfo, err = d.describeEC2Instances(ctx, ec2InstanceIDs) - if err != nil { - d.logger.Error("Failed to describe EC2 instances", "cluster", clusterArn, "error", err) - } - } - } - - // Batch describe ENIs to get public IPs for awsvpc tasks - var eniToPublicIP map[string]string - if len(eniIDs) > 0 { - var err error - eniToPublicIP, err = d.describeNetworkInterfaces(ctx, eniIDs) - if err != nil { - d.logger.Error("Failed to describe network interfaces", "cluster", clusterArn, "error", err) - // Continue processing without ENI public IPs - } - } - - for _, task := range tasks { - var ipAddress, subnetID, publicIP string - var networkMode string - var ec2InstanceID, ec2InstanceType, ec2InstancePrivateIP, ec2InstancePublicIP string - - // Try to get IP from ENI attachment (awsvpc mode) - var eniAttachment *types.Attachment - for _, attachment := range task.Attachments { - if attachment.Type != nil && *attachment.Type == "ElasticNetworkInterface" { - eniAttachment = &attachment - break - } - } - - if eniAttachment != nil { - // awsvpc networking mode - get IP from ENI - networkMode = "awsvpc" - for _, detail := range eniAttachment.Details { - switch *detail.Name { - case "privateIPv4Address": - ipAddress = *detail.Value - case "subnetId": - subnetID = *detail.Value - } - } - // Get public IP from ENI if available - if eniID, ok := taskToENI[*task.TaskArn]; ok { - if eniPublicIP, ok := eniToPublicIP[eniID]; ok { - publicIP = eniPublicIP - } - } - } else if task.ContainerInstanceArn != nil { - // bridge/host networking mode - need to get EC2 instance IP and subnet - networkMode = "bridge" - containerInstARN, ok := taskToContainerInstance[*task.TaskArn] - if ok { - ec2InstanceID, ok = containerInstToEC2[containerInstARN] - if ok { - info, ok := ec2InstInfo[ec2InstanceID] - if ok { - ipAddress = info.privateIP - publicIP = info.publicIP - subnetID = info.subnetID - ec2InstanceType = info.instanceType - ec2InstancePrivateIP = info.privateIP - ec2InstancePublicIP = info.publicIP - } else { - d.logger.Debug("EC2 instance info not found", "instance", ec2InstanceID, "task", *task.TaskArn) - } - } else { - d.logger.Debug("Container instance not found in map", "arn", containerInstARN, "task", *task.TaskArn) - } - } - } - - // Get EC2 instance metadata for awsvpc tasks running on EC2 - // We want the instance type and the host IPs for advanced use cases - if networkMode == "awsvpc" && task.ContainerInstanceArn != nil { - containerInstARN, ok := taskToContainerInstance[*task.TaskArn] - if ok { - ec2InstanceID, ok = containerInstToEC2[containerInstARN] - if ok { - info, ok := ec2InstInfo[ec2InstanceID] - if ok { - ec2InstanceType = info.instanceType - ec2InstancePrivateIP = info.privateIP - ec2InstancePublicIP = info.publicIP - } - } - } - } - - if ipAddress == "" { - continue - } - - labels := model.LabelSet{ - ecsLabelClusterARN: model.LabelValue(*clusterService.ClusterArn), - ecsLabelService: model.LabelValue(*clusterService.ServiceName), - ecsLabelServiceARN: model.LabelValue(*clusterService.ServiceArn), - ecsLabelServiceStatus: model.LabelValue(*clusterService.Status), - ecsLabelTaskGroup: model.LabelValue(*task.Group), - ecsLabelTaskARN: model.LabelValue(*task.TaskArn), - ecsLabelTaskDefinition: model.LabelValue(*task.TaskDefinitionArn), - ecsLabelIPAddress: model.LabelValue(ipAddress), - ecsLabelRegion: model.LabelValue(d.cfg.Region), - ecsLabelLaunchType: model.LabelValue(task.LaunchType), - ecsLabelAvailabilityZone: model.LabelValue(*task.AvailabilityZone), - ecsLabelDesiredStatus: model.LabelValue(*task.DesiredStatus), - ecsLabelLastStatus: model.LabelValue(*task.LastStatus), - ecsLabelHealthStatus: model.LabelValue(task.HealthStatus), - ecsLabelNetworkMode: model.LabelValue(networkMode), - } - - // Add subnet ID when available (awsvpc mode from ENI, bridge/host from EC2 instance) - if subnetID != "" { - labels[ecsLabelSubnetID] = model.LabelValue(subnetID) - } - - // Add container instance and EC2 instance info for EC2 launch type - if task.ContainerInstanceArn != nil { - labels[ecsLabelContainerInstanceARN] = model.LabelValue(*task.ContainerInstanceArn) - } - if ec2InstanceID != "" { - labels[ecsLabelEC2InstanceID] = model.LabelValue(ec2InstanceID) - } - if ec2InstanceType != "" { - labels[ecsLabelEC2InstanceType] = model.LabelValue(ec2InstanceType) - } - if ec2InstancePrivateIP != "" { - labels[ecsLabelEC2InstancePrivateIP] = model.LabelValue(ec2InstancePrivateIP) - } - if ec2InstancePublicIP != "" { - labels[ecsLabelEC2InstancePublicIP] = model.LabelValue(ec2InstancePublicIP) - } - if publicIP != "" { - labels[ecsLabelPublicIP] = model.LabelValue(publicIP) - } - - if task.PlatformFamily != nil { - labels[ecsLabelPlatformFamily] = model.LabelValue(*task.PlatformFamily) - } - if task.PlatformVersion != nil { - labels[ecsLabelPlatformVersion] = model.LabelValue(*task.PlatformVersion) - } - - labels[model.AddressLabel] = model.LabelValue(net.JoinHostPort(ipAddress, strconv.Itoa(d.cfg.Port))) - - // Add cluster tags - if cluster, exists := clusterARNMap[*clusterService.ClusterArn]; exists { - if cluster.ClusterName != nil { - labels[ecsLabelCluster] = model.LabelValue(*cluster.ClusterName) - } - - for _, clusterTag := range cluster.Tags { - if clusterTag.Key != nil && clusterTag.Value != nil { - labels[model.LabelName(ecsLabelTagCluster+strutil.SanitizeLabelName(*clusterTag.Key))] = model.LabelValue(*clusterTag.Value) - } - } - } - - // Add service tags - for _, serviceTag := range clusterService.Tags { - if serviceTag.Key != nil && serviceTag.Value != nil { - labels[model.LabelName(ecsLabelTagService+strutil.SanitizeLabelName(*serviceTag.Key))] = model.LabelValue(*serviceTag.Value) - } - } - - // Add task tags - for _, taskTag := range task.Tags { - if taskTag.Key != nil && taskTag.Value != nil { - labels[model.LabelName(ecsLabelTagTask+strutil.SanitizeLabelName(*taskTag.Key))] = model.LabelValue(*taskTag.Value) - } - } - - // Add EC2 instance tags (if running on EC2) - if ec2InstanceID != "" { - if info, ok := ec2InstInfo[ec2InstanceID]; ok { - for tagKey, tagValue := range info.tags { - labels[model.LabelName(ecsLabelTagEC2+strutil.SanitizeLabelName(tagKey))] = model.LabelValue(tagValue) - } - } - } - - serviceTargets = append(serviceTargets, labels) - } - - // Add service targets to local targets with mutex protection - localTargetsMu.Lock() - localTargets = append(localTargets, serviceTargets...) - localTargetsMu.Unlock() - } - }(clusterService) + if err := resourceErrg.Wait(); err != nil { + return } - serviceWg.Wait() + // Fetch container instances and network interfaces in parallel (both depend on tasks) + var ( + containerInstances map[string]string + eniToPublicIP map[string]string + ) - // Add all local targets to main target group with mutex protection - targetsMu.Lock() - tg.Targets = append(tg.Targets, localTargets...) - targetsMu.Unlock() - }(clusterArn, clusterServices) + instanceErrg, instanceCtx := errgroup.WithContext(ctx) + instanceErrg.Go(func() error { + var err error + containerInstances, err = d.describeContainerInstances(instanceCtx, *cluster.ClusterArn, tasks) + if err != nil { + d.logger.Error("Failed to describe container instances for cluster", "cluster", *cluster.ClusterArn, "error", err) + } + return err + }) + instanceErrg.Go(func() error { + var err error + eniToPublicIP, err = d.describeNetworkInterfaces(instanceCtx, tasks) + if err != nil { + d.logger.Error("Failed to describe network interfaces for cluster", "cluster", *cluster.ClusterArn, "error", err) + } + return err + }) + + if err := instanceErrg.Wait(); err != nil { + return + } + + ec2Instances := make(map[string]ec2InstanceInfo) + if len(containerInstances) > 0 { + // Deduplicate EC2 instance IDs (multiple tasks can share the same instance) + ec2InstanceIDSet := make(map[string]struct{}) + for _, ec2ID := range containerInstances { + ec2InstanceIDSet[ec2ID] = struct{}{} + } + ec2InstanceIDs := make([]string, 0, len(ec2InstanceIDSet)) + for ec2ID := range ec2InstanceIDSet { + ec2InstanceIDs = append(ec2InstanceIDs, ec2ID) + } + ec2Instances, err = d.describeEC2Instances(ctx, ec2InstanceIDs) + if err != nil { + d.logger.Error("Failed to describe EC2 instances for cluster", "cluster", *cluster.ClusterArn, "error", err) + return + } + } + + var ( + taskWg sync.WaitGroup + taskMu sync.Mutex + taskTargets []model.LabelSet + ) + + for _, task := range tasks { + taskWg.Add(1) + + go func(cluster types.Cluster, services map[string]types.Service, task types.Task, containerInstances map[string]string, ec2Instances map[string]ec2InstanceInfo, eniToPublicIP map[string]string) { + defer taskWg.Done() + + var ( + ipAddress, subnetID, publicIP string + networkMode string + ec2InstanceID, ec2InstanceType, ec2InstancePrivateIP, ec2InstancePublicIP string + ) + + // Try to get IP from ENI attachment (awsvpc mode) + var eniAttachment *types.Attachment + for _, attachment := range task.Attachments { + if attachment.Type != nil && *attachment.Type == "ElasticNetworkInterface" { + eniAttachment = &attachment + break + } + } + + if eniAttachment != nil { + // awsvpc networking mode - get IP from ENI + networkMode = "awsvpc" + var eniID string + for _, detail := range eniAttachment.Details { + switch *detail.Name { + case "privateIPv4Address": + ipAddress = *detail.Value + case "subnetId": + subnetID = *detail.Value + case "networkInterfaceId": + eniID = *detail.Value + } + } + // Get public IP from ENI if available + if eniID != "" { + if pub, ok := eniToPublicIP[eniID]; ok { + publicIP = pub + } + } + } else if task.ContainerInstanceArn != nil { + // bridge/host networking mode - need to get EC2 instance IP and subnet + networkMode = "bridge" + var ok bool + ec2InstanceID, ok = containerInstances[*task.ContainerInstanceArn] + if ok { + info, ok := ec2Instances[ec2InstanceID] + if ok { + ipAddress = info.privateIP + publicIP = info.publicIP + subnetID = info.subnetID + ec2InstanceType = info.instanceType + ec2InstancePrivateIP = info.privateIP + ec2InstancePublicIP = info.publicIP + } else { + d.logger.Debug("EC2 instance info not found", "instance", ec2InstanceID, "task", *task.TaskArn) + } + } else { + d.logger.Debug("Container instance not found in map", "arn", *task.ContainerInstanceArn, "task", *task.TaskArn) + } + } + + // Get EC2 instance metadata for awsvpc tasks running on EC2 + // We want the instance type and the host IPs for advanced use cases + if networkMode == "awsvpc" && task.ContainerInstanceArn != nil { + var ok bool + ec2InstanceID, ok = containerInstances[*task.ContainerInstanceArn] + if ok { + info, ok := ec2Instances[ec2InstanceID] + if ok { + ec2InstanceType = info.instanceType + ec2InstancePrivateIP = info.privateIP + ec2InstancePublicIP = info.publicIP + } + } + } + + if ipAddress == "" { + return + } + + labels := model.LabelSet{ + ecsLabelClusterARN: model.LabelValue(*cluster.ClusterArn), + ecsLabelCluster: model.LabelValue(*cluster.ClusterName), + ecsLabelTaskGroup: model.LabelValue(*task.Group), + ecsLabelTaskARN: model.LabelValue(*task.TaskArn), + ecsLabelTaskDefinition: model.LabelValue(*task.TaskDefinitionArn), + ecsLabelIPAddress: model.LabelValue(ipAddress), + ecsLabelRegion: model.LabelValue(d.cfg.Region), + ecsLabelLaunchType: model.LabelValue(task.LaunchType), + ecsLabelAvailabilityZone: model.LabelValue(*task.AvailabilityZone), + ecsLabelDesiredStatus: model.LabelValue(*task.DesiredStatus), + ecsLabelLastStatus: model.LabelValue(*task.LastStatus), + ecsLabelHealthStatus: model.LabelValue(task.HealthStatus), + ecsLabelNetworkMode: model.LabelValue(networkMode), + } + + // Add subnet ID when available (awsvpc mode from ENI, bridge/host from EC2 instance) + if subnetID != "" { + labels[ecsLabelSubnetID] = model.LabelValue(subnetID) + } + + // Add container instance and EC2 instance info for EC2 launch type + if task.ContainerInstanceArn != nil { + labels[ecsLabelContainerInstanceARN] = model.LabelValue(*task.ContainerInstanceArn) + } + if ec2InstanceID != "" { + labels[ecsLabelEC2InstanceID] = model.LabelValue(ec2InstanceID) + } + if ec2InstanceType != "" { + labels[ecsLabelEC2InstanceType] = model.LabelValue(ec2InstanceType) + } + if ec2InstancePrivateIP != "" { + labels[ecsLabelEC2InstancePrivateIP] = model.LabelValue(ec2InstancePrivateIP) + } + if ec2InstancePublicIP != "" { + labels[ecsLabelEC2InstancePublicIP] = model.LabelValue(ec2InstancePublicIP) + } + if publicIP != "" { + labels[ecsLabelPublicIP] = model.LabelValue(publicIP) + } + + if task.PlatformFamily != nil { + labels[ecsLabelPlatformFamily] = model.LabelValue(*task.PlatformFamily) + } + if task.PlatformVersion != nil { + labels[ecsLabelPlatformVersion] = model.LabelValue(*task.PlatformVersion) + } + + labels[model.AddressLabel] = model.LabelValue(net.JoinHostPort(ipAddress, strconv.Itoa(d.cfg.Port))) + + // Add cluster tags + for _, clusterTag := range cluster.Tags { + if clusterTag.Key != nil && clusterTag.Value != nil { + labels[model.LabelName(ecsLabelTagCluster+strutil.SanitizeLabelName(*clusterTag.Key))] = model.LabelValue(*clusterTag.Value) + } + } + + // If this is not a standalone task, add service information and tags + if !isStandaloneTask(task) { + service, ok := services[getServiceNameFromTaskGroup(task)] + if !ok { + d.logger.Debug("Service not found for task", "task", *task.TaskArn, "service", getServiceNameFromTaskGroup(task)) + } + if service.ServiceName != nil { + labels[ecsLabelService] = model.LabelValue(*service.ServiceName) + } + if service.ServiceArn != nil { + labels[ecsLabelServiceARN] = model.LabelValue(*service.ServiceArn) + } + if service.Status != nil { + labels[ecsLabelServiceStatus] = model.LabelValue(*service.Status) + } + + // Add service tags + for _, serviceTag := range service.Tags { + if serviceTag.Key != nil && serviceTag.Value != nil { + labels[model.LabelName(ecsLabelTagService+strutil.SanitizeLabelName(*serviceTag.Key))] = model.LabelValue(*serviceTag.Value) + } + } + } + + // Add task tags + for _, taskTag := range task.Tags { + if taskTag.Key != nil && taskTag.Value != nil { + labels[model.LabelName(ecsLabelTagTask+strutil.SanitizeLabelName(*taskTag.Key))] = model.LabelValue(*taskTag.Value) + } + } + + // Add EC2 instance tags (if running on EC2) + if ec2InstanceID != "" { + if info, ok := ec2Instances[ec2InstanceID]; ok { + for tagKey, tagValue := range info.tags { + labels[model.LabelName(ecsLabelTagEC2+strutil.SanitizeLabelName(tagKey))] = model.LabelValue(tagValue) + } + } + } + + taskMu.Lock() + taskTargets = append(taskTargets, labels) + taskMu.Unlock() + }(cluster, services, task, containerInstances, ec2Instances, eniToPublicIP) + } + + taskWg.Wait() + + // Add this cluster's task targets to the overall collection + clusterMu.Lock() + clusterTargets = append(clusterTargets, taskTargets...) + clusterMu.Unlock() + }(clusterMap[clusterARN], serviceMap[clusterARN], taskARNs) } - wg.Wait() + clusterWg.Wait() + + // Set all targets to the target group + tg.Targets = clusterTargets return []*targetgroup.Group{tg}, nil } + +func isStandaloneTask(task types.Task) bool { + // A standalone task will have a group of "family:task-def-name" + return task.Group != nil && strings.HasPrefix(*task.Group, "family:") +} + +func getServiceNameFromTaskGroup(task types.Task) string { + return strings.Split(*task.Group, ":")[1] +} diff --git a/discovery/aws/ecs_test.go b/discovery/aws/ecs_test.go index 1cb48b27fa..bb1f96a28e 100644 --- a/discovery/aws/ecs_test.go +++ b/discovery/aws/ecs_test.go @@ -214,7 +214,6 @@ func TestECSDiscoveryDescribeClusters(t *testing.T) { func TestECSDiscoveryListServiceARNs(t *testing.T) { ctx := context.Background() - // iterate through the test cases for _, tt := range []struct { name string ecsData *ecsDataStore @@ -225,33 +224,18 @@ func TestECSDiscoveryListServiceARNs(t *testing.T) { name: "SingleClusterWithServices", ecsData: &ecsDataStore{ region: "us-west-2", - clusters: []ecsTypes.Cluster{ - { - ClusterName: strptr("test-cluster"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Status: strptr("ACTIVE"), - }, - }, services: []ecsTypes.Service{ { ServiceName: strptr("web-service"), ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service"), ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Status: strptr("RUNNING"), + Status: strptr("ACTIVE"), }, { ServiceName: strptr("api-service"), ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service"), ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Status: strptr("RUNNING"), - }, - { - // this is to test the old arn format without the cluster name in the service arn - // https://docs.aws.amazon.com/AmazonECS/latest/developerguide/service-arn-migration.html - ServiceName: strptr("old-api-service"), - ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/old-api-service"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Status: strptr("RUNNING"), + Status: strptr("ACTIVE"), }, }, }, @@ -260,70 +244,50 @@ func TestECSDiscoveryListServiceARNs(t *testing.T) { "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster": { "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service", "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service", - "arn:aws:ecs:us-west-2:123456789012:service/old-api-service", }, }, }, { - name: "MultipleClustesWithServices", + name: "MultipleClusters", ecsData: &ecsDataStore{ - region: "us-east-1", - clusters: []ecsTypes.Cluster{ - { - ClusterName: strptr("cluster-1"), - ClusterArn: strptr("arn:aws:ecs:us-east-1:123456789012:cluster/cluster-1"), - Status: strptr("ACTIVE"), - }, - { - ClusterName: strptr("cluster-2"), - ClusterArn: strptr("arn:aws:ecs:us-east-1:123456789012:cluster/cluster-2"), - Status: strptr("ACTIVE"), - }, - }, + region: "us-west-2", services: []ecsTypes.Service{ { - ServiceName: strptr("service-1"), - ServiceArn: strptr("arn:aws:ecs:us-east-1:123456789012:service/cluster-1/service-1"), - ClusterArn: strptr("arn:aws:ecs:us-east-1:123456789012:cluster/cluster-1"), - Status: strptr("RUNNING"), + ServiceName: strptr("web-service"), + ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/cluster-1/web-service"), + ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/cluster-1"), + Status: strptr("ACTIVE"), }, { - ServiceName: strptr("service-2"), - ServiceArn: strptr("arn:aws:ecs:us-east-1:123456789012:service/cluster-2/service-2"), - ClusterArn: strptr("arn:aws:ecs:us-east-1:123456789012:cluster/cluster-2"), - Status: strptr("RUNNING"), + ServiceName: strptr("api-service"), + ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/cluster-2/api-service"), + ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/cluster-2"), + Status: strptr("ACTIVE"), }, }, }, clusterARNs: []string{ - "arn:aws:ecs:us-east-1:123456789012:cluster/cluster-1", - "arn:aws:ecs:us-east-1:123456789012:cluster/cluster-2", + "arn:aws:ecs:us-west-2:123456789012:cluster/cluster-1", + "arn:aws:ecs:us-west-2:123456789012:cluster/cluster-2", }, expected: map[string][]string{ - "arn:aws:ecs:us-east-1:123456789012:cluster/cluster-1": { - "arn:aws:ecs:us-east-1:123456789012:service/cluster-1/service-1", + "arn:aws:ecs:us-west-2:123456789012:cluster/cluster-1": { + "arn:aws:ecs:us-west-2:123456789012:service/cluster-1/web-service", }, - "arn:aws:ecs:us-east-1:123456789012:cluster/cluster-2": { - "arn:aws:ecs:us-east-1:123456789012:service/cluster-2/service-2", + "arn:aws:ecs:us-west-2:123456789012:cluster/cluster-2": { + "arn:aws:ecs:us-west-2:123456789012:service/cluster-2/api-service", }, }, }, { - name: "ClusterWithNoServices", + name: "EmptyCluster", ecsData: &ecsDataStore{ - region: "us-west-2", - clusters: []ecsTypes.Cluster{ - { - ClusterName: strptr("empty-cluster"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/empty-cluster"), - Status: strptr("ACTIVE"), - }, - }, + region: "us-west-2", services: []ecsTypes.Service{}, }, - clusterARNs: []string{"arn:aws:ecs:us-west-2:123456789012:cluster/empty-cluster"}, + clusterARNs: []string{"arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"}, expected: map[string][]string{ - "arn:aws:ecs:us-west-2:123456789012:cluster/empty-cluster": nil, + "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster": nil, }, }, } { @@ -334,7 +298,7 @@ func TestECSDiscoveryListServiceARNs(t *testing.T) { ecs: client, cfg: &ECSSDConfig{ Region: tt.ecsData.region, - RequestConcurrency: 1, + RequestConcurrency: 2, }, } @@ -348,113 +312,178 @@ func TestECSDiscoveryListServiceARNs(t *testing.T) { func TestECSDiscoveryDescribeServices(t *testing.T) { ctx := context.Background() - // iterate through the test cases for _, tt := range []struct { - name string - ecsData *ecsDataStore - clusterServiceARNsMap map[string][]string - expected map[string][]ecsTypes.Service + name string + ecsData *ecsDataStore + clusterARN string + serviceARNs []string + expected map[string]ecsTypes.Service }{ { - name: "SingleClusterServices", + name: "ServicesWithTags", ecsData: &ecsDataStore{ region: "us-west-2", services: []ecsTypes.Service{ { - ServiceName: strptr("web-service"), - ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Status: strptr("RUNNING"), - TaskDefinition: strptr("arn:aws:ecs:us-west-2:123456789012:task-definition/web-task:1"), + ServiceName: strptr("web-service"), + ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service"), + ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), + Status: strptr("ACTIVE"), Tags: []ecsTypes.Tag{ {Key: strptr("Environment"), Value: strptr("production")}, + {Key: strptr("Team"), Value: strptr("platform")}, }, }, { - ServiceName: strptr("api-service"), - ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Status: strptr("RUNNING"), - TaskDefinition: strptr("arn:aws:ecs:us-west-2:123456789012:task-definition/api-task:2"), - }, - }, - }, - clusterServiceARNsMap: map[string][]string{ - "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster": { - "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service", - "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service", - }, - }, - expected: map[string][]ecsTypes.Service{ - "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster": { - { - ServiceName: strptr("web-service"), - ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Status: strptr("RUNNING"), - TaskDefinition: strptr("arn:aws:ecs:us-west-2:123456789012:task-definition/web-task:1"), + ServiceName: strptr("api-service"), + ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service"), + ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), + Status: strptr("ACTIVE"), Tags: []ecsTypes.Tag{ - {Key: strptr("Environment"), Value: strptr("production")}, + {Key: strptr("Environment"), Value: strptr("staging")}, }, }, - { - ServiceName: strptr("api-service"), - ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Status: strptr("RUNNING"), - TaskDefinition: strptr("arn:aws:ecs:us-west-2:123456789012:task-definition/api-task:2"), + }, + }, + clusterARN: "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster", + serviceARNs: []string{ + "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service", + "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service", + }, + expected: map[string]ecsTypes.Service{ + "web-service": { + ServiceName: strptr("web-service"), + ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service"), + ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), + Status: strptr("ACTIVE"), + Tags: []ecsTypes.Tag{ + {Key: strptr("Environment"), Value: strptr("production")}, + {Key: strptr("Team"), Value: strptr("platform")}, + }, + }, + "api-service": { + ServiceName: strptr("api-service"), + ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service"), + ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), + Status: strptr("ACTIVE"), + Tags: []ecsTypes.Tag{ + {Key: strptr("Environment"), Value: strptr("staging")}, }, }, }, }, { - name: "MultipleClustersServices", + name: "EmptyServiceList", ecsData: &ecsDataStore{ - region: "us-east-1", - services: []ecsTypes.Service{ + region: "us-west-2", + services: []ecsTypes.Service{}, + }, + clusterARN: "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster", + serviceARNs: []string{}, + expected: map[string]ecsTypes.Service{}, + }, + } { + t.Run(tt.name, func(t *testing.T) { + client := newMockECSClient(tt.ecsData) + + d := &ECSDiscovery{ + ecs: client, + cfg: &ECSSDConfig{ + Region: tt.ecsData.region, + RequestConcurrency: 2, + }, + } + + services, err := d.describeServices(ctx, tt.clusterARN, tt.serviceARNs) + require.NoError(t, err) + require.Equal(t, tt.expected, services) + }) + } +} + +func TestECSDiscoveryDescribeContainerInstances(t *testing.T) { + ctx := context.Background() + + for _, tt := range []struct { + name string + ecsData *ecsDataStore + clusterARN string + tasks []ecsTypes.Task + expected map[string]string + }{ + { + name: "EC2Tasks", + ecsData: &ecsDataStore{ + region: "us-west-2", + containerInstances: []ecsTypes.ContainerInstance{ { - ServiceName: strptr("service-1"), - ServiceArn: strptr("arn:aws:ecs:us-east-1:123456789012:service/cluster-1/service-1"), - ClusterArn: strptr("arn:aws:ecs:us-east-1:123456789012:cluster/cluster-1"), - Status: strptr("RUNNING"), - TaskDefinition: strptr("arn:aws:ecs:us-east-1:123456789012:task-definition/task-1:1"), + ContainerInstanceArn: strptr("arn:aws:ecs:us-west-2:123456789012:container-instance/test-cluster/abc123"), + Ec2InstanceId: strptr("i-1234567890abcdef0"), }, { - ServiceName: strptr("service-2"), - ServiceArn: strptr("arn:aws:ecs:us-east-1:123456789012:service/cluster-2/service-2"), - ClusterArn: strptr("arn:aws:ecs:us-east-1:123456789012:cluster/cluster-2"), - Status: strptr("DRAINING"), - TaskDefinition: strptr("arn:aws:ecs:us-east-1:123456789012:task-definition/task-2:1"), + ContainerInstanceArn: strptr("arn:aws:ecs:us-west-2:123456789012:container-instance/test-cluster/xyz789"), + Ec2InstanceId: strptr("i-0987654321fedcba0"), }, }, }, - clusterServiceARNsMap: map[string][]string{ - "arn:aws:ecs:us-east-1:123456789012:cluster/cluster-1": { - "arn:aws:ecs:us-east-1:123456789012:service/cluster-1/service-1", + clusterARN: "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster", + tasks: []ecsTypes.Task{ + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-1"), + ContainerInstanceArn: strptr("arn:aws:ecs:us-west-2:123456789012:container-instance/test-cluster/abc123"), + LaunchType: ecsTypes.LaunchTypeEc2, }, - "arn:aws:ecs:us-east-1:123456789012:cluster/cluster-2": { - "arn:aws:ecs:us-east-1:123456789012:service/cluster-2/service-2", + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-2"), + ContainerInstanceArn: strptr("arn:aws:ecs:us-west-2:123456789012:container-instance/test-cluster/xyz789"), + LaunchType: ecsTypes.LaunchTypeEc2, }, }, - expected: map[string][]ecsTypes.Service{ - "arn:aws:ecs:us-east-1:123456789012:cluster/cluster-1": { + expected: map[string]string{ + "arn:aws:ecs:us-west-2:123456789012:container-instance/test-cluster/abc123": "i-1234567890abcdef0", + "arn:aws:ecs:us-west-2:123456789012:container-instance/test-cluster/xyz789": "i-0987654321fedcba0", + }, + }, + { + name: "FargateTasks", + ecsData: &ecsDataStore{ + region: "us-west-2", + containerInstances: []ecsTypes.ContainerInstance{}, + }, + clusterARN: "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster", + tasks: []ecsTypes.Task{ + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-1"), + LaunchType: ecsTypes.LaunchTypeFargate, + }, + }, + expected: map[string]string{}, + }, + { + name: "MixedTasks", + ecsData: &ecsDataStore{ + region: "us-west-2", + containerInstances: []ecsTypes.ContainerInstance{ { - ServiceName: strptr("service-1"), - ServiceArn: strptr("arn:aws:ecs:us-east-1:123456789012:service/cluster-1/service-1"), - ClusterArn: strptr("arn:aws:ecs:us-east-1:123456789012:cluster/cluster-1"), - Status: strptr("RUNNING"), - TaskDefinition: strptr("arn:aws:ecs:us-east-1:123456789012:task-definition/task-1:1"), + ContainerInstanceArn: strptr("arn:aws:ecs:us-west-2:123456789012:container-instance/test-cluster/abc123"), + Ec2InstanceId: strptr("i-1234567890abcdef0"), }, }, - "arn:aws:ecs:us-east-1:123456789012:cluster/cluster-2": { - { - ServiceName: strptr("service-2"), - ServiceArn: strptr("arn:aws:ecs:us-east-1:123456789012:service/cluster-2/service-2"), - ClusterArn: strptr("arn:aws:ecs:us-east-1:123456789012:cluster/cluster-2"), - Status: strptr("DRAINING"), - TaskDefinition: strptr("arn:aws:ecs:us-east-1:123456789012:task-definition/task-2:1"), - }, + }, + clusterARN: "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster", + tasks: []ecsTypes.Task{ + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-ec2"), + ContainerInstanceArn: strptr("arn:aws:ecs:us-west-2:123456789012:container-instance/test-cluster/abc123"), + LaunchType: ecsTypes.LaunchTypeEc2, }, + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-fargate"), + LaunchType: ecsTypes.LaunchTypeFargate, + }, + }, + expected: map[string]string{ + "arn:aws:ecs:us-west-2:123456789012:container-instance/test-cluster/abc123": "i-1234567890abcdef0", }, }, } { @@ -465,13 +494,267 @@ func TestECSDiscoveryDescribeServices(t *testing.T) { ecs: client, cfg: &ECSSDConfig{ Region: tt.ecsData.region, - RequestConcurrency: 1, + RequestConcurrency: 2, }, } - serviceMap, err := d.describeServices(ctx, tt.clusterServiceARNsMap) + containerInstances, err := d.describeContainerInstances(ctx, tt.clusterARN, tt.tasks) require.NoError(t, err) - require.Equal(t, tt.expected, serviceMap) + require.Equal(t, tt.expected, containerInstances) + }) + } +} + +func TestECSDiscoveryDescribeEC2Instances(t *testing.T) { + ctx := context.Background() + + for _, tt := range []struct { + name string + ecsData *ecsDataStore + instanceIDs []string + expected map[string]ec2InstanceInfo + }{ + { + name: "InstancesWithTags", + ecsData: &ecsDataStore{ + region: "us-west-2", + ec2Instances: map[string]ec2InstanceInfo{ + "i-1234567890abcdef0": { + privateIP: "10.0.1.50", + publicIP: "54.1.2.3", + subnetID: "subnet-12345", + instanceType: "t3.medium", + tags: map[string]string{ + "Name": "ecs-host-1", + "Environment": "production", + }, + }, + "i-0987654321fedcba0": { + privateIP: "10.0.1.75", + publicIP: "54.2.3.4", + subnetID: "subnet-67890", + instanceType: "t3.large", + tags: map[string]string{ + "Name": "ecs-host-2", + "Team": "platform", + }, + }, + }, + }, + instanceIDs: []string{"i-1234567890abcdef0", "i-0987654321fedcba0"}, + expected: map[string]ec2InstanceInfo{ + "i-1234567890abcdef0": { + privateIP: "10.0.1.50", + publicIP: "54.1.2.3", + subnetID: "subnet-12345", + instanceType: "t3.medium", + tags: map[string]string{ + "Name": "ecs-host-1", + "Environment": "production", + }, + }, + "i-0987654321fedcba0": { + privateIP: "10.0.1.75", + publicIP: "54.2.3.4", + subnetID: "subnet-67890", + instanceType: "t3.large", + tags: map[string]string{ + "Name": "ecs-host-2", + "Team": "platform", + }, + }, + }, + }, + { + name: "EmptyList", + ecsData: &ecsDataStore{ + region: "us-west-2", + ec2Instances: map[string]ec2InstanceInfo{}, + }, + instanceIDs: []string{}, + expected: map[string]ec2InstanceInfo{}, + }, + { + name: "InstanceWithoutPublicIP", + ecsData: &ecsDataStore{ + region: "us-west-2", + ec2Instances: map[string]ec2InstanceInfo{ + "i-privateonly": { + privateIP: "10.0.1.100", + publicIP: "", + subnetID: "subnet-private", + instanceType: "t3.micro", + tags: map[string]string{}, + }, + }, + }, + instanceIDs: []string{"i-privateonly"}, + expected: map[string]ec2InstanceInfo{ + "i-privateonly": { + privateIP: "10.0.1.100", + publicIP: "", + subnetID: "subnet-private", + instanceType: "t3.micro", + tags: map[string]string{}, + }, + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + ec2Client := newMockECSEC2Client(tt.ecsData.ec2Instances, nil) + + d := &ECSDiscovery{ + ec2: ec2Client, + cfg: &ECSSDConfig{ + Region: tt.ecsData.region, + RequestConcurrency: 2, + }, + } + + instances, err := d.describeEC2Instances(ctx, tt.instanceIDs) + require.NoError(t, err) + require.Equal(t, tt.expected, instances) + }) + } +} + +func TestECSDiscoveryDescribeNetworkInterfaces(t *testing.T) { + ctx := context.Background() + + for _, tt := range []struct { + name string + ecsData *ecsDataStore + tasks []ecsTypes.Task + expected map[string]string + }{ + { + name: "AwsvpcTasksWithPublicIPs", + ecsData: &ecsDataStore{ + region: "us-west-2", + eniPublicIPs: map[string]string{ + "eni-12345": "52.1.2.3", + "eni-67890": "52.2.3.4", + }, + }, + tasks: []ecsTypes.Task{ + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-1"), + LaunchType: ecsTypes.LaunchTypeFargate, + Attachments: []ecsTypes.Attachment{ + { + Type: strptr("ElasticNetworkInterface"), + Details: []ecsTypes.KeyValuePair{ + {Name: strptr("networkInterfaceId"), Value: strptr("eni-12345")}, + {Name: strptr("privateIPv4Address"), Value: strptr("10.0.1.100")}, + }, + }, + }, + }, + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-2"), + LaunchType: ecsTypes.LaunchTypeFargate, + Attachments: []ecsTypes.Attachment{ + { + Type: strptr("ElasticNetworkInterface"), + Details: []ecsTypes.KeyValuePair{ + {Name: strptr("networkInterfaceId"), Value: strptr("eni-67890")}, + {Name: strptr("privateIPv4Address"), Value: strptr("10.0.1.200")}, + }, + }, + }, + }, + }, + expected: map[string]string{ + "eni-12345": "52.1.2.3", + "eni-67890": "52.2.3.4", + }, + }, + { + name: "AwsvpcTasksWithoutPublicIPs", + ecsData: &ecsDataStore{ + region: "us-west-2", + eniPublicIPs: map[string]string{}, + }, + tasks: []ecsTypes.Task{ + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-1"), + LaunchType: ecsTypes.LaunchTypeFargate, + Attachments: []ecsTypes.Attachment{ + { + Type: strptr("ElasticNetworkInterface"), + Details: []ecsTypes.KeyValuePair{ + {Name: strptr("networkInterfaceId"), Value: strptr("eni-private")}, + {Name: strptr("privateIPv4Address"), Value: strptr("10.0.1.100")}, + }, + }, + }, + }, + }, + expected: map[string]string{}, + }, + { + name: "BridgeTasksNoENI", + ecsData: &ecsDataStore{ + region: "us-west-2", + eniPublicIPs: map[string]string{}, + }, + tasks: []ecsTypes.Task{ + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-1"), + LaunchType: ecsTypes.LaunchTypeEc2, + // No ENI attachment for bridge networking + Attachments: []ecsTypes.Attachment{}, + }, + }, + expected: map[string]string{}, + }, + { + name: "MixedTasks", + ecsData: &ecsDataStore{ + region: "us-west-2", + eniPublicIPs: map[string]string{ + "eni-fargate": "52.1.2.3", + }, + }, + tasks: []ecsTypes.Task{ + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-fargate"), + LaunchType: ecsTypes.LaunchTypeFargate, + Attachments: []ecsTypes.Attachment{ + { + Type: strptr("ElasticNetworkInterface"), + Details: []ecsTypes.KeyValuePair{ + {Name: strptr("networkInterfaceId"), Value: strptr("eni-fargate")}, + {Name: strptr("privateIPv4Address"), Value: strptr("10.0.1.100")}, + }, + }, + }, + }, + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-bridge"), + LaunchType: ecsTypes.LaunchTypeEc2, + Attachments: []ecsTypes.Attachment{}, + }, + }, + expected: map[string]string{ + "eni-fargate": "52.1.2.3", + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + ec2Client := newMockECSEC2Client(nil, tt.ecsData.eniPublicIPs) + + d := &ECSDiscovery{ + ec2: ec2Client, + cfg: &ECSSDConfig{ + Region: tt.ecsData.region, + RequestConcurrency: 2, + }, + } + + eniMap, err := d.describeNetworkInterfaces(ctx, tt.tasks) + require.NoError(t, err) + require.Equal(t, tt.expected, eniMap) }) } } @@ -481,13 +764,13 @@ func TestECSDiscoveryListTaskARNs(t *testing.T) { // iterate through the test cases for _, tt := range []struct { - name string - ecsData *ecsDataStore - services []ecsTypes.Service - expected map[string][]string + name string + ecsData *ecsDataStore + clusterARNs []string + expected map[string][]string }{ { - name: "ServicesWithTasks", + name: "TasksInCluster", ecsData: &ecsDataStore{ region: "us-west-2", tasks: []ecsTypes.Task{ @@ -511,46 +794,24 @@ func TestECSDiscoveryListTaskARNs(t *testing.T) { }, }, }, - services: []ecsTypes.Service{ - { - ServiceName: strptr("web-service"), - ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Status: strptr("RUNNING"), - }, - { - ServiceName: strptr("api-service"), - ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Status: strptr("RUNNING"), - }, - }, + clusterARNs: []string{"arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"}, expected: map[string][]string{ - "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service": { + "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster": { "arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-1", "arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-2", - }, - "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service": { "arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-3", }, }, }, { - name: "ServiceWithNoTasks", + name: "EmptyCluster", ecsData: &ecsDataStore{ region: "us-west-2", tasks: []ecsTypes.Task{}, }, - services: []ecsTypes.Service{ - { - ServiceName: strptr("empty-service"), - ServiceArn: strptr("arn:aws:ecs:us-west-2:123456789012:service/test-cluster/empty-service"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Status: strptr("RUNNING"), - }, - }, + clusterARNs: []string{"arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"}, expected: map[string][]string{ - "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/empty-service": nil, + "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster": nil, }, }, } { @@ -565,7 +826,7 @@ func TestECSDiscoveryListTaskARNs(t *testing.T) { }, } - taskMap, err := d.listTaskARNs(ctx, tt.services) + taskMap, err := d.listTaskARNs(ctx, tt.clusterARNs) require.NoError(t, err) require.Equal(t, tt.expected, taskMap) }) @@ -577,11 +838,11 @@ func TestECSDiscoveryDescribeTasks(t *testing.T) { // iterate through the test cases for _, tt := range []struct { - name string - ecsData *ecsDataStore - clusterARN string - taskARNsMap map[string][]string - expected map[string][]ecsTypes.Task + name string + ecsData *ecsDataStore + clusterARN string + taskARNs []string + expected []ecsTypes.Task }{ { name: "TasksInCluster", @@ -608,47 +869,39 @@ func TestECSDiscoveryDescribeTasks(t *testing.T) { }, }, clusterARN: "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster", - taskARNsMap: map[string][]string{ - "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service": { - "arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-1", - }, - "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service": { - "arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-2", - }, + taskARNs: []string{ + "arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-1", + "arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-2", }, - expected: map[string][]ecsTypes.Task{ - "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/web-service": { - { - TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-1"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Group: strptr("service:web-service"), - TaskDefinitionArn: strptr("arn:aws:ecs:us-west-2:123456789012:task-definition/web-task:1"), - LastStatus: strptr("RUNNING"), - Tags: []ecsTypes.Tag{ - {Key: strptr("Environment"), Value: strptr("production")}, - }, + expected: []ecsTypes.Task{ + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-1"), + ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), + Group: strptr("service:web-service"), + TaskDefinitionArn: strptr("arn:aws:ecs:us-west-2:123456789012:task-definition/web-task:1"), + LastStatus: strptr("RUNNING"), + Tags: []ecsTypes.Tag{ + {Key: strptr("Environment"), Value: strptr("production")}, }, }, - "arn:aws:ecs:us-west-2:123456789012:service/test-cluster/api-service": { - { - TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-2"), - ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), - Group: strptr("service:api-service"), - TaskDefinitionArn: strptr("arn:aws:ecs:us-west-2:123456789012:task-definition/api-task:2"), - LastStatus: strptr("RUNNING"), - }, + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/test-cluster/task-2"), + ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster"), + Group: strptr("service:api-service"), + TaskDefinitionArn: strptr("arn:aws:ecs:us-west-2:123456789012:task-definition/api-task:2"), + LastStatus: strptr("RUNNING"), }, }, }, { - name: "EmptyTaskARNsMap", + name: "EmptyTaskList", ecsData: &ecsDataStore{ region: "us-west-2", tasks: []ecsTypes.Task{}, }, - clusterARN: "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster", - taskARNsMap: map[string][]string{}, - expected: map[string][]ecsTypes.Task{}, + clusterARN: "arn:aws:ecs:us-west-2:123456789012:cluster/test-cluster", + taskARNs: []string{}, + expected: nil, }, } { t.Run(tt.name, func(t *testing.T) { @@ -662,9 +915,9 @@ func TestECSDiscoveryDescribeTasks(t *testing.T) { }, } - taskMap, err := d.describeTasks(ctx, tt.clusterARN, tt.taskARNsMap) + tasks, err := d.describeTasks(ctx, tt.clusterARN, tt.taskARNs) require.NoError(t, err) - require.Equal(t, tt.expected, taskMap) + require.Equal(t, tt.expected, tasks) }) } } @@ -836,6 +1089,75 @@ func TestECSDiscoveryRefresh(t *testing.T) { }, }, }, + { + name: "StandaloneTaskNoService", + ecsData: &ecsDataStore{ + region: "us-west-2", + clusters: []ecsTypes.Cluster{ + { + ClusterName: strptr("standalone-cluster"), + ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/standalone-cluster"), + Status: strptr("ACTIVE"), + }, + }, + services: []ecsTypes.Service{}, + tasks: []ecsTypes.Task{ + { + TaskArn: strptr("arn:aws:ecs:us-west-2:123456789012:task/standalone-cluster/task-standalone"), + ClusterArn: strptr("arn:aws:ecs:us-west-2:123456789012:cluster/standalone-cluster"), + TaskDefinitionArn: strptr("arn:aws:ecs:us-west-2:123456789012:task-definition/standalone-task:1"), + Group: strptr("family:standalone-task"), + LaunchType: ecsTypes.LaunchTypeFargate, + LastStatus: strptr("RUNNING"), + DesiredStatus: strptr("RUNNING"), + HealthStatus: ecsTypes.HealthStatusHealthy, + AvailabilityZone: strptr("us-west-2a"), + Attachments: []ecsTypes.Attachment{ + { + Type: strptr("ElasticNetworkInterface"), + Details: []ecsTypes.KeyValuePair{ + {Name: strptr("subnetId"), Value: strptr("subnet-standalone-1")}, + {Name: strptr("privateIPv4Address"), Value: strptr("10.0.4.10")}, + {Name: strptr("networkInterfaceId"), Value: strptr("eni-standalone-123")}, + }, + }, + }, + Tags: []ecsTypes.Tag{ + {Key: strptr("Role"), Value: strptr("batch")}, + }, + }, + }, + eniPublicIPs: map[string]string{ + "eni-standalone-123": "52.4.5.6", + }, + }, + expected: []*targetgroup.Group{ + { + Source: "us-west-2", + Targets: []model.LabelSet{ + { + model.AddressLabel: model.LabelValue("10.0.4.10:80"), + "__meta_ecs_cluster": model.LabelValue("standalone-cluster"), + "__meta_ecs_cluster_arn": model.LabelValue("arn:aws:ecs:us-west-2:123456789012:cluster/standalone-cluster"), + "__meta_ecs_task_group": model.LabelValue("family:standalone-task"), + "__meta_ecs_task_arn": model.LabelValue("arn:aws:ecs:us-west-2:123456789012:task/standalone-cluster/task-standalone"), + "__meta_ecs_task_definition": model.LabelValue("arn:aws:ecs:us-west-2:123456789012:task-definition/standalone-task:1"), + "__meta_ecs_region": model.LabelValue("us-west-2"), + "__meta_ecs_availability_zone": model.LabelValue("us-west-2a"), + "__meta_ecs_subnet_id": model.LabelValue("subnet-standalone-1"), + "__meta_ecs_ip_address": model.LabelValue("10.0.4.10"), + "__meta_ecs_launch_type": model.LabelValue("FARGATE"), + "__meta_ecs_desired_status": model.LabelValue("RUNNING"), + "__meta_ecs_last_status": model.LabelValue("RUNNING"), + "__meta_ecs_health_status": model.LabelValue("HEALTHY"), + "__meta_ecs_network_mode": model.LabelValue("awsvpc"), + "__meta_ecs_public_ip": model.LabelValue("52.4.5.6"), + "__meta_ecs_tag_task_Role": model.LabelValue("batch"), + }, + }, + }, + }, + }, { name: "TaskWithBridgeNetworking", ecsData: &ecsDataStore{ @@ -1184,7 +1506,14 @@ func TestECSDiscoveryRefresh(t *testing.T) { groups, err := d.refresh(ctx) require.NoError(t, err) - require.Equal(t, tt.expected, groups) + if tt.name == "MixedNetworkingModes" { + // Use ElementsMatch for tests with multiple tasks as goroutines can affect order + require.Len(t, groups, len(tt.expected)) + require.Equal(t, tt.expected[0].Source, groups[0].Source) + require.ElementsMatch(t, tt.expected[0].Targets, groups[0].Targets) + } else { + require.Equal(t, tt.expected, groups) + } }) } } @@ -1381,3 +1710,98 @@ func (m *mockECSEC2Client) DescribeNetworkInterfaces(_ context.Context, input *e NetworkInterfaces: networkInterfaces, }, nil } + +func TestIsStandaloneTask(t *testing.T) { + tests := []struct { + name string + task ecsTypes.Task + expected bool + }{ + { + name: "StandaloneTask", + task: ecsTypes.Task{ + Group: strptr("family:my-task-definition"), + }, + expected: true, + }, + { + name: "ServiceTask", + task: ecsTypes.Task{ + Group: strptr("service:my-service"), + }, + expected: false, + }, + { + name: "ServiceTaskWithColon", + task: ecsTypes.Task{ + Group: strptr("service:my:service:name"), + }, + expected: false, + }, + { + name: "NilGroup", + task: ecsTypes.Task{ + Group: nil, + }, + expected: false, + }, + { + name: "EmptyGroup", + task: ecsTypes.Task{ + Group: strptr(""), + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := isStandaloneTask(tt.task) + require.Equal(t, tt.expected, result) + }) + } +} + +func TestGetServiceNameFromTaskGroup(t *testing.T) { + tests := []struct { + name string + task ecsTypes.Task + expected string + }{ + { + name: "SimpleServiceName", + task: ecsTypes.Task{ + Group: strptr("service:my-service"), + }, + expected: "my-service", + }, + { + name: "ServiceNameWithHyphens", + task: ecsTypes.Task{ + Group: strptr("service:web-api-service"), + }, + expected: "web-api-service", + }, + { + name: "ServiceNameWithColons", + task: ecsTypes.Task{ + Group: strptr("service:my:service:name"), + }, + expected: "my", + }, + { + name: "FamilyGroup", + task: ecsTypes.Task{ + Group: strptr("family:my-task-def"), + }, + expected: "my-task-def", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := getServiceNameFromTaskGroup(tt.task) + require.Equal(t, tt.expected, result) + }) + } +}