From 16834a0d7825a08ec7f3aa952320195ac5faca78 Mon Sep 17 00:00:00 2001 From: Brad Warren Date: Fri, 17 May 2019 11:36:58 -0700 Subject: [PATCH] Stop sharing state between processes in test farm tests (#7057) * Set LOGDIR at top of script. * Set sentinel at top of script. * Don't use EC2 global to block on instance start. * Remove global boto3 state. * Pass in boulder_url. * Create main function. * Add link to reload docs. --- tests/letstest/multitester.py | 467 +++++++++++++++++----------------- 1 file changed, 237 insertions(+), 230 deletions(-) diff --git a/tests/letstest/multitester.py b/tests/letstest/multitester.py index 37a4a40c1..39f8739df 100644 --- a/tests/letstest/multitester.py +++ b/tests/letstest/multitester.py @@ -99,11 +99,9 @@ PROFILE = cl_args.aws_profile # Globals #------------------------------------------------------------------------------- BOULDER_AMI = 'ami-072a9534772bec854' # premade shared boulder AMI 18.04LTS us-east-1 -LOGDIR = "" #points to logging / working directory -# boto3/AWS api globals -AWS_SESSION = None -EC2 = None +LOGDIR = "letest-%d"%int(time.time()) #points to logging / working directory SECURITY_GROUP_NAME = 'certbot-security-group' +SENTINEL = None #queue kill signal SUBNET_NAME = 'certbot-subnet' class Status(object): @@ -144,28 +142,30 @@ def make_security_group(vpc): mysg.authorize_ingress(IpProtocol="udp", CidrIp="0.0.0.0/0", FromPort=60000, ToPort=61000) return mysg -def make_instance(instance_name, +def make_instance(ec2_client, + instance_name, ami_id, keyname, security_group_id, subnet_id, machine_type='t2.micro', userdata=""): #userdata contains bash or cloud-init script - block_device_mappings = _get_block_device_mappings(ami_id) + block_device_mappings = _get_block_device_mappings(ec2_client, ami_id) tags = [{'Key': 'Name', 'Value': instance_name}] tag_spec = [{'ResourceType': 'instance', 'Tags': tags}] - return EC2.create_instances(BlockDeviceMappings=block_device_mappings, - ImageId=ami_id, - SecurityGroupIds=[security_group_id], - SubnetId=subnet_id, - KeyName=keyname, - MinCount=1, - MaxCount=1, - UserData=userdata, - InstanceType=machine_type, - TagSpecifications=tag_spec)[0] + return ec2_client.create_instances( + BlockDeviceMappings=block_device_mappings, + ImageId=ami_id, + SecurityGroupIds=[security_group_id], + SubnetId=subnet_id, + KeyName=keyname, + MinCount=1, + MaxCount=1, + UserData=userdata, + InstanceType=machine_type, + TagSpecifications=tag_spec)[0] -def _get_block_device_mappings(ami_id): +def _get_block_device_mappings(ec2_client, ami_id): """Returns the list of block device mappings to ensure cleanup. This list sets connected EBS volumes to be deleted when the EC2 @@ -178,7 +178,7 @@ def _get_block_device_mappings(ami_id): # * https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-ec2-blockdev-template.html return [{'DeviceName': mapping['DeviceName'], 'Ebs': {'DeleteOnTermination': True}} - for mapping in EC2.Image(ami_id).block_device_mappings + for mapping in ec2_client.Image(ami_id).block_device_mappings if not mapping.get('Ebs', {}).get('DeleteOnTermination', True)] @@ -217,20 +217,18 @@ def block_until_ssh_open(ipstring, wait_time=10, timeout=120): def block_until_instance_ready(booting_instance, wait_time=5, extra_wait_time=20): "Blocks booting_instance until AWS EC2 instance is ready to accept SSH connections" - # the reinstantiation from id is necessary to force boto3 - # to correctly update the 'state' variable during init - _id = booting_instance.id - _instance = EC2.Instance(id=_id) - _state = _instance.state['Name'] - _ip = _instance.public_ip_address - while _state != 'running' or _ip is None: + state = booting_instance.state['Name'] + ip = booting_instance.public_ip_address + while state != 'running' or ip is None: time.sleep(wait_time) - _instance = EC2.Instance(id=_id) - _state = _instance.state['Name'] - _ip = _instance.public_ip_address - block_until_ssh_open(_ip) + # The instance needs to be reloaded to update its local attributes. See + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2.html#EC2.Instance.reload. + booting_instance.reload() + state = booting_instance.state['Name'] + ip = booting_instance.public_ip_address + block_until_ssh_open(ip) time.sleep(extra_wait_time) - return _instance + return booting_instance # Fabric Routines @@ -307,7 +305,7 @@ def grab_certbot_log(): cat ./certbot.log; else echo "[nolocallog]"; fi') -def create_client_instance(target, security_group_id, subnet_id): +def create_client_instance(ec2_client, target, security_group_id, subnet_id): """Create a single client instance for running tests.""" if target['virt'] == 'hvm': machine_type = 't2.medium' if cl_args.fast else 't2.micro' @@ -320,7 +318,8 @@ def create_client_instance(target, security_group_id, subnet_id): userdata = '' name = 'le-%s'%target['name'] print(name, end=" ") - return make_instance(name, + return make_instance(ec2_client, + name, target['ami'], KEYNAME, machine_type=machine_type, @@ -329,22 +328,28 @@ def create_client_instance(target, security_group_id, subnet_id): userdata=userdata) -def test_client_process(inqueue, outqueue): +def test_client_process(inqueue, outqueue, boulder_url): cur_proc = mp.current_process() for inreq in iter(inqueue.get, SENTINEL): - ii, target = inreq + ii, instance_id, target = inreq + + # Each client process is given its own session due to the suggestion at + # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html?highlight=multithreading#multithreading-multiprocessing. + aws_session = boto3.session.Session(profile_name=PROFILE) + ec2_client = aws_session.resource('ec2') + instance = ec2_client.Instance(id=instance_id) #save all stdout to log file sys.stdout = open(LOGDIR+'/'+'%d_%s.log'%(ii,target['name']), 'w') print("[%s : client %d %s %s]" % (cur_proc.name, ii, target['ami'], target['name'])) - instances[ii] = block_until_instance_ready(instances[ii]) - print("server %s at %s"%(instances[ii], instances[ii].public_ip_address)) - env.host_string = "%s@%s"%(target['user'], instances[ii].public_ip_address) + instance = block_until_instance_ready(instance) + print("server %s at %s"%(instance, instance.public_ip_address)) + env.host_string = "%s@%s"%(target['user'], instance.public_ip_address) print(env.host_string) try: - install_and_launch_certbot(instances[ii], boulder_url, target) + install_and_launch_certbot(instance, boulder_url, target) outqueue.put((ii, target, Status.PASS)) print("%s - %s SUCCESS"%(target['ami'], target['name'])) except: @@ -382,203 +387,205 @@ def cleanup(cl_args, instances, targetlist): "%s@%s"%(target['user'], instances[ii].public_ip_address)) +def main(): + # Fabric library controlled through global env parameters + env.key_filename = KEYFILE + env.shell = '/bin/bash -l -i -c' + env.connection_attempts = 5 + env.timeout = 10 + # replace default SystemExit thrown by fabric during trouble + class FabricException(Exception): + pass + env['abort_exception'] = FabricException -#------------------------------------------------------------------------------- -# SCRIPT BEGINS -#------------------------------------------------------------------------------- + # Set up local copy of git repo + #------------------------------------------------------------------------------- + print("Making local dir for test repo and logs: %s"%LOGDIR) + local('mkdir %s'%LOGDIR) -# Fabric library controlled through global env parameters -env.key_filename = KEYFILE -env.shell = '/bin/bash -l -i -c' -env.connection_attempts = 5 -env.timeout = 10 -# replace default SystemExit thrown by fabric during trouble -class FabricException(Exception): - pass -env['abort_exception'] = FabricException + # figure out what git object to test and locally create it in LOGDIR + print("Making local git repo") + try: + if cl_args.pull_request != '~': + print('Testing PR %s '%cl_args.pull_request, + "MERGING into master" if cl_args.merge_master else "") + execute(local_git_PR, cl_args.repo, cl_args.pull_request, cl_args.merge_master) + elif cl_args.branch != '~': + print('Testing branch %s of %s'%(cl_args.branch, cl_args.repo)) + execute(local_git_branch, cl_args.repo, cl_args.branch) + else: + print('Testing master of %s'%cl_args.repo) + execute(local_git_clone, cl_args.repo) + except FabricException: + print("FAIL: trouble with git repo") + traceback.print_exc() + exit() -# Set up local copy of git repo -#------------------------------------------------------------------------------- -LOGDIR = "letest-%d"%int(time.time()) -print("Making local dir for test repo and logs: %s"%LOGDIR) -local('mkdir %s'%LOGDIR) -# figure out what git object to test and locally create it in LOGDIR -print("Making local git repo") -try: - if cl_args.pull_request != '~': - print('Testing PR %s '%cl_args.pull_request, - "MERGING into master" if cl_args.merge_master else "") - execute(local_git_PR, cl_args.repo, cl_args.pull_request, cl_args.merge_master) - elif cl_args.branch != '~': - print('Testing branch %s of %s'%(cl_args.branch, cl_args.repo)) - execute(local_git_branch, cl_args.repo, cl_args.branch) + # Set up EC2 instances + #------------------------------------------------------------------------------- + configdata = yaml.load(open(cl_args.config_file, 'r')) + targetlist = configdata['targets'] + print('Testing against these images: [%d total]'%len(targetlist)) + for target in targetlist: + print(target['ami'], target['name']) + + print("Connecting to EC2 using\n profile %s\n keyname %s\n keyfile %s"%(PROFILE, KEYNAME, KEYFILE)) + aws_session = boto3.session.Session(profile_name=PROFILE) + ec2_client = aws_session.resource('ec2') + + print("Determining Subnet") + for subnet in ec2_client.subnets.all(): + if should_use_subnet(subnet): + subnet_id = subnet.id + vpc_id = subnet.vpc.id + break else: - print('Testing master of %s'%cl_args.repo) - execute(local_git_clone, cl_args.repo) -except FabricException: - print("FAIL: trouble with git repo") - traceback.print_exc() - exit() - - -# Set up EC2 instances -#------------------------------------------------------------------------------- -configdata = yaml.load(open(cl_args.config_file, 'r')) -targetlist = configdata['targets'] -print('Testing against these images: [%d total]'%len(targetlist)) -for target in targetlist: - print(target['ami'], target['name']) - -print("Connecting to EC2 using\n profile %s\n keyname %s\n keyfile %s"%(PROFILE, KEYNAME, KEYFILE)) -AWS_SESSION = boto3.session.Session(profile_name=PROFILE) -EC2 = AWS_SESSION.resource('ec2') - -print("Determining Subnet") -for subnet in EC2.subnets.all(): - if should_use_subnet(subnet): - subnet_id = subnet.id - vpc_id = subnet.vpc.id - break -else: - print("No usable subnet exists!") - print("Please create a VPC with a subnet named {0}".format(SUBNET_NAME)) - print("that maps public IPv4 addresses to instances launched in the subnet.") - sys.exit(1) - -print("Making Security Group") -vpc = EC2.Vpc(vpc_id) -sg_exists = False -for sg in vpc.security_groups.all(): - if sg.group_name == SECURITY_GROUP_NAME: - security_group_id = sg.id - sg_exists = True - print(" %s already exists"%SECURITY_GROUP_NAME) -if not sg_exists: - security_group_id = make_security_group(vpc).id - time.sleep(30) - -boulder_preexists = False -boulder_servers = EC2.instances.filter(Filters=[ - {'Name': 'tag:Name', 'Values': ['le-boulderserver']}, - {'Name': 'instance-state-name', 'Values': ['running']}]) - -boulder_server = next(iter(boulder_servers), None) - -print("Requesting Instances...") -if boulder_server: - print("Found existing boulder server:", boulder_server) - boulder_preexists = True -else: - print("Can't find a boulder server, starting one...") - boulder_server = make_instance('le-boulderserver', - BOULDER_AMI, - KEYNAME, - machine_type='t2.micro', - #machine_type='t2.medium', - security_group_id=security_group_id, - subnet_id=subnet_id) - -instances = [] -try: - if not cl_args.boulderonly: - print("Creating instances: ", end="") - for target in targetlist: - instances.append(create_client_instance(target, security_group_id, subnet_id)) - print() - - # Configure and launch boulder server - #------------------------------------------------------------------------------- - print("Waiting on Boulder Server") - boulder_server = block_until_instance_ready(boulder_server) - print(" server %s"%boulder_server) - - - # env.host_string defines the ssh user and host for connection - env.host_string = "ubuntu@%s"%boulder_server.public_ip_address - print("Boulder Server at (SSH):", env.host_string) - if not boulder_preexists: - print("Configuring and Launching Boulder") - config_and_launch_boulder(boulder_server) - # blocking often unnecessary, but cheap EC2 VMs can get very slow - block_until_http_ready('http://%s:4000'%boulder_server.public_ip_address, - wait_time=10, timeout=500) - - boulder_url = "http://%s:4000/directory"%boulder_server.private_ip_address - print("Boulder Server at (public ip): http://%s:4000/directory"%boulder_server.public_ip_address) - print("Boulder Server at (EC2 private ip): %s"%boulder_url) - - if cl_args.boulderonly: - sys.exit(0) - - # Install and launch client scripts in parallel - #------------------------------------------------------------------------------- - print("Uploading and running test script in parallel: %s"%cl_args.test_script) - print("Output routed to log files in %s"%LOGDIR) - # (Advice: always use Manager.Queue, never regular multiprocessing.Queue - # the latter has implementation flaws that deadlock it in some circumstances) - manager = Manager() - outqueue = manager.Queue() - inqueue = manager.Queue() - SENTINEL = None #queue kill signal - - # launch as many processes as clients to test - num_processes = len(targetlist) - jobs = [] #keep a reference to current procs - - - # initiate process execution - for i in range(num_processes): - p = mp.Process(target=test_client_process, args=(inqueue, outqueue)) - jobs.append(p) - p.daemon = True # kills subprocesses if parent is killed - p.start() - - # fill up work queue - for ii, target in enumerate(targetlist): - inqueue.put((ii, target)) - - # add SENTINELs to end client processes - for i in range(num_processes): - inqueue.put(SENTINEL) - print('Waiting on client processes', end='') - for p in jobs: - while p.is_alive(): - p.join(5 * 60) - # Regularly print output to keep Travis happy - print('.', end='') - sys.stdout.flush() - print() - # add SENTINEL to output queue - outqueue.put(SENTINEL) - - # clean up - execute(local_repo_clean) - - # print and save summary results - results_file = open(LOGDIR+'/results', 'w') - outputs = [outq for outq in iter(outqueue.get, SENTINEL)] - outputs.sort(key=lambda x: x[0]) - failed = False - for outq in outputs: - ii, target, status = outq - if status == Status.FAIL: - failed = True - print('%d %s %s'%(ii, target['name'], status)) - results_file.write('%d %s %s\n'%(ii, target['name'], status)) - if len(outputs) != num_processes: - failed = True - failure_message = 'FAILURE: Some target machines failed to run and were not tested. ' +\ - 'Tests should be rerun.' - print(failure_message) - results_file.write(failure_message + '\n') - results_file.close() - - if failed: + print("No usable subnet exists!") + print("Please create a VPC with a subnet named {0}".format(SUBNET_NAME)) + print("that maps public IPv4 addresses to instances launched in the subnet.") sys.exit(1) -finally: - cleanup(cl_args, instances, targetlist) + print("Making Security Group") + vpc = ec2_client.Vpc(vpc_id) + sg_exists = False + for sg in vpc.security_groups.all(): + if sg.group_name == SECURITY_GROUP_NAME: + security_group_id = sg.id + sg_exists = True + print(" %s already exists"%SECURITY_GROUP_NAME) + if not sg_exists: + security_group_id = make_security_group(vpc).id + time.sleep(30) - # kill any connections - fabric.network.disconnect_all() + boulder_preexists = False + boulder_servers = ec2_client.instances.filter(Filters=[ + {'Name': 'tag:Name', 'Values': ['le-boulderserver']}, + {'Name': 'instance-state-name', 'Values': ['running']}]) + + boulder_server = next(iter(boulder_servers), None) + + print("Requesting Instances...") + if boulder_server: + print("Found existing boulder server:", boulder_server) + boulder_preexists = True + else: + print("Can't find a boulder server, starting one...") + boulder_server = make_instance(ec2_client, + 'le-boulderserver', + BOULDER_AMI, + KEYNAME, + machine_type='t2.micro', + #machine_type='t2.medium', + security_group_id=security_group_id, + subnet_id=subnet_id) + + instances = [] + try: + if not cl_args.boulderonly: + print("Creating instances: ", end="") + for target in targetlist: + instances.append( + create_client_instance(ec2_client, target, + security_group_id, subnet_id) + ) + print() + + # Configure and launch boulder server + #------------------------------------------------------------------------------- + print("Waiting on Boulder Server") + boulder_server = block_until_instance_ready(boulder_server) + print(" server %s"%boulder_server) + + + # env.host_string defines the ssh user and host for connection + env.host_string = "ubuntu@%s"%boulder_server.public_ip_address + print("Boulder Server at (SSH):", env.host_string) + if not boulder_preexists: + print("Configuring and Launching Boulder") + config_and_launch_boulder(boulder_server) + # blocking often unnecessary, but cheap EC2 VMs can get very slow + block_until_http_ready('http://%s:4000'%boulder_server.public_ip_address, + wait_time=10, timeout=500) + + boulder_url = "http://%s:4000/directory"%boulder_server.private_ip_address + print("Boulder Server at (public ip): http://%s:4000/directory"%boulder_server.public_ip_address) + print("Boulder Server at (EC2 private ip): %s"%boulder_url) + + if cl_args.boulderonly: + sys.exit(0) + + # Install and launch client scripts in parallel + #------------------------------------------------------------------------------- + print("Uploading and running test script in parallel: %s"%cl_args.test_script) + print("Output routed to log files in %s"%LOGDIR) + # (Advice: always use Manager.Queue, never regular multiprocessing.Queue + # the latter has implementation flaws that deadlock it in some circumstances) + manager = Manager() + outqueue = manager.Queue() + inqueue = manager.Queue() + + # launch as many processes as clients to test + num_processes = len(targetlist) + jobs = [] #keep a reference to current procs + + + # initiate process execution + for i in range(num_processes): + p = mp.Process(target=test_client_process, args=(inqueue, outqueue, boulder_url)) + jobs.append(p) + p.daemon = True # kills subprocesses if parent is killed + p.start() + + # fill up work queue + for ii, target in enumerate(targetlist): + inqueue.put((ii, instances[ii].id, target)) + + # add SENTINELs to end client processes + for i in range(num_processes): + inqueue.put(SENTINEL) + print('Waiting on client processes', end='') + for p in jobs: + while p.is_alive(): + p.join(5 * 60) + # Regularly print output to keep Travis happy + print('.', end='') + sys.stdout.flush() + print() + # add SENTINEL to output queue + outqueue.put(SENTINEL) + + # clean up + execute(local_repo_clean) + + # print and save summary results + results_file = open(LOGDIR+'/results', 'w') + outputs = [outq for outq in iter(outqueue.get, SENTINEL)] + outputs.sort(key=lambda x: x[0]) + failed = False + for outq in outputs: + ii, target, status = outq + if status == Status.FAIL: + failed = True + print('%d %s %s'%(ii, target['name'], status)) + results_file.write('%d %s %s\n'%(ii, target['name'], status)) + if len(outputs) != num_processes: + failed = True + failure_message = 'FAILURE: Some target machines failed to run and were not tested. ' +\ + 'Tests should be rerun.' + print(failure_message) + results_file.write(failure_message + '\n') + results_file.close() + + if failed: + sys.exit(1) + + finally: + cleanup(cl_args, instances, targetlist) + + # kill any connections + fabric.network.disconnect_all() + + +if __name__ == '__main__': + main()