diff --git a/tests/letstest/multitester.py b/tests/letstest/multitester.py index 430acb634..19b49c413 100644 --- a/tests/letstest/multitester.py +++ b/tests/letstest/multitester.py @@ -99,11 +99,9 @@ PROFILE = cl_args.aws_profile # Globals #------------------------------------------------------------------------------- BOULDER_AMI = 'ami-072a9534772bec854' # premade shared boulder AMI 18.04LTS us-east-1 -LOGDIR = "" #points to logging / working directory -# boto3/AWS api globals -AWS_SESSION = None -EC2 = None +LOGDIR = "letest-%d"%int(time.time()) #points to logging / working directory SECURITY_GROUP_NAME = 'certbot-security-group' +SENTINEL = None #queue kill signal SUBNET_NAME = 'certbot-subnet' # Boto3/AWS automation functions @@ -139,7 +137,8 @@ def make_security_group(vpc): mysg.authorize_ingress(IpProtocol="udp", CidrIp="0.0.0.0/0", FromPort=60000, ToPort=61000) return mysg -def make_instance(instance_name, +def make_instance(ec2_client, + instance_name, ami_id, keyname, security_group_id, @@ -147,8 +146,8 @@ def make_instance(instance_name, machine_type='t2.micro', userdata=""): #userdata contains bash or cloud-init script - new_instance = EC2.create_instances( - BlockDeviceMappings=_get_block_device_mappings(ami_id), + new_instance = ec2_client.create_instances( + BlockDeviceMappings=_get_block_device_mappings(ec2_client, ami_id), ImageId=ami_id, SecurityGroupIds=[security_group_id], SubnetId=subnet_id, @@ -173,7 +172,7 @@ def make_instance(instance_name, raise return new_instance -def _get_block_device_mappings(ami_id): +def _get_block_device_mappings(ec2_client, ami_id): """Returns the list of block device mappings to ensure cleanup. This list sets connected EBS volumes to be deleted when the EC2 @@ -186,7 +185,7 @@ def _get_block_device_mappings(ami_id): # * https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-ec2-blockdev-template.html return [{'DeviceName': mapping['DeviceName'], 'Ebs': {'DeleteOnTermination': True}} - for mapping in EC2.Image(ami_id).block_device_mappings + for mapping in ec2_client.Image(ami_id).block_device_mappings if not mapping.get('Ebs', {}).get('DeleteOnTermination', True)] @@ -225,20 +224,17 @@ def block_until_ssh_open(ipstring, wait_time=10, timeout=120): def block_until_instance_ready(booting_instance, wait_time=5, extra_wait_time=20): "Blocks booting_instance until AWS EC2 instance is ready to accept SSH connections" - # the reinstantiation from id is necessary to force boto3 - # to correctly update the 'state' variable during init - _id = booting_instance.id - _instance = EC2.Instance(id=_id) - _state = _instance.state['Name'] - _ip = _instance.public_ip_address - while _state != 'running' or _ip is None: + state = booting_instance.state['Name'] + ip = booting_instance.public_ip_address + while state != 'running' or ip is None: time.sleep(wait_time) - _instance = EC2.Instance(id=_id) - _state = _instance.state['Name'] - _ip = _instance.public_ip_address - block_until_ssh_open(_ip) + # The instance needs to be reloaded to update its local attributes. + booting_instance.reload() + state = booting_instance.state['Name'] + ip = booting_instance.public_ip_address + block_until_ssh_open(ip) time.sleep(extra_wait_time) - return _instance + return booting_instance # Fabric Routines @@ -314,7 +310,7 @@ def grab_certbot_log(): sudo('if [ -f ./certbot.log ]; then \ cat ./certbot.log; else echo "[nolocallog]"; fi') -def create_client_instances(targetlist, security_group_id, subnet_id): +def create_client_instances(ec2_client, targetlist, security_group_id, subnet_id): "Create a fleet of client instances" instances = [] print("Creating instances: ", end="") @@ -330,7 +326,8 @@ def create_client_instances(targetlist, security_group_id, subnet_id): userdata = '' name = 'le-%s'%target['name'] print(name, end=" ") - instances.append(make_instance(name, + instances.append(make_instance(ec2_client, + name, target['ami'], KEYNAME, machine_type=machine_type, @@ -341,22 +338,28 @@ def create_client_instances(targetlist, security_group_id, subnet_id): return instances -def test_client_process(inqueue, outqueue): +def test_client_process(inqueue, outqueue, boulder_url): cur_proc = mp.current_process() for inreq in iter(inqueue.get, SENTINEL): - ii, target = inreq + ii, instance_id, target = inreq + + # Each client process is given its own session due to the suggestion at + # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html?highlight=multithreading#multithreading-multiprocessing. + aws_session = boto3.session.Session(profile_name=PROFILE) + ec2_client = aws_session.resource('ec2') + instance = ec2_client.Instance(id=instance_id) #save all stdout to log file sys.stdout = open(LOGDIR+'/'+'%d_%s.log'%(ii,target['name']), 'w') print("[%s : client %d %s %s]" % (cur_proc.name, ii, target['ami'], target['name'])) - instances[ii] = block_until_instance_ready(instances[ii]) - print("server %s at %s"%(instances[ii], instances[ii].public_ip_address)) - env.host_string = "%s@%s"%(target['user'], instances[ii].public_ip_address) + instance = block_until_instance_ready(instance) + print("server %s at %s"%(instance, instance.public_ip_address)) + env.host_string = "%s@%s"%(target['user'], instance.public_ip_address) print(env.host_string) try: - install_and_launch_certbot(instances[ii], boulder_url, target) + install_and_launch_certbot(instance, boulder_url, target) outqueue.put((ii, target, 'pass')) print("%s - %s SUCCESS"%(target['ami'], target['name'])) except: @@ -391,187 +394,186 @@ def cleanup(cl_args, instances, targetlist): "%s@%s"%(target['user'], instances[ii].public_ip_address)) +def main(): + # Fabric library controlled through global env parameters + env.key_filename = KEYFILE + env.shell = '/bin/bash -l -i -c' + env.connection_attempts = 5 + env.timeout = 10 + # replace default SystemExit thrown by fabric during trouble + class FabricException(Exception): + pass + env['abort_exception'] = FabricException -#------------------------------------------------------------------------------- -# SCRIPT BEGINS -#------------------------------------------------------------------------------- + # Set up local copy of git repo + #------------------------------------------------------------------------------- + print("Making local dir for test repo and logs: %s"%LOGDIR) + local('mkdir %s'%LOGDIR) -# Fabric library controlled through global env parameters -env.key_filename = KEYFILE -env.shell = '/bin/bash -l -i -c' -env.connection_attempts = 5 -env.timeout = 10 -# replace default SystemExit thrown by fabric during trouble -class FabricException(Exception): - pass -env['abort_exception'] = FabricException + # figure out what git object to test and locally create it in LOGDIR + print("Making local git repo") + try: + if cl_args.pull_request != '~': + print('Testing PR %s '%cl_args.pull_request, + "MERGING into master" if cl_args.merge_master else "") + execute(local_git_PR, cl_args.repo, cl_args.pull_request, cl_args.merge_master) + elif cl_args.branch != '~': + print('Testing branch %s of %s'%(cl_args.branch, cl_args.repo)) + execute(local_git_branch, cl_args.repo, cl_args.branch) + else: + print('Testing master of %s'%cl_args.repo) + execute(local_git_clone, cl_args.repo) + except FabricException: + print("FAIL: trouble with git repo") + traceback.print_exc() + exit() -# Set up local copy of git repo -#------------------------------------------------------------------------------- -LOGDIR = "letest-%d"%int(time.time()) -print("Making local dir for test repo and logs: %s"%LOGDIR) -local('mkdir %s'%LOGDIR) -# figure out what git object to test and locally create it in LOGDIR -print("Making local git repo") -try: - if cl_args.pull_request != '~': - print('Testing PR %s '%cl_args.pull_request, - "MERGING into master" if cl_args.merge_master else "") - execute(local_git_PR, cl_args.repo, cl_args.pull_request, cl_args.merge_master) - elif cl_args.branch != '~': - print('Testing branch %s of %s'%(cl_args.branch, cl_args.repo)) - execute(local_git_branch, cl_args.repo, cl_args.branch) + # Set up EC2 instances + #------------------------------------------------------------------------------- + configdata = yaml.load(open(cl_args.config_file, 'r')) + targetlist = configdata['targets'] + print('Testing against these images: [%d total]'%len(targetlist)) + for target in targetlist: + print(target['ami'], target['name']) + + print("Connecting to EC2 using\n profile %s\n keyname %s\n keyfile %s"%(PROFILE, KEYNAME, KEYFILE)) + aws_session = boto3.session.Session(profile_name=PROFILE) + ec2_client = aws_session.resource('ec2') + + print("Determining Subnet") + for subnet in ec2_client.subnets.all(): + if should_use_subnet(subnet): + subnet_id = subnet.id + vpc_id = subnet.vpc.id + break else: - print('Testing master of %s'%cl_args.repo) - execute(local_git_clone, cl_args.repo) -except FabricException: - print("FAIL: trouble with git repo") - traceback.print_exc() - exit() + print("No usable subnet exists!") + print("Please create a VPC with a subnet named {0}".format(SUBNET_NAME)) + print("that maps public IPv4 addresses to instances launched in the subnet.") + sys.exit(1) + + print("Making Security Group") + vpc = ec2_client.Vpc(vpc_id) + sg_exists = False + for sg in vpc.security_groups.all(): + if sg.group_name == SECURITY_GROUP_NAME: + security_group_id = sg.id + sg_exists = True + print(" %s already exists"%SECURITY_GROUP_NAME) + if not sg_exists: + security_group_id = make_security_group(vpc).id + time.sleep(30) + + boulder_preexists = False + boulder_servers = ec2_client.instances.filter(Filters=[ + {'Name': 'tag:Name', 'Values': ['le-boulderserver']}, + {'Name': 'instance-state-name', 'Values': ['running']}]) + + boulder_server = next(iter(boulder_servers), None) + + print("Requesting Instances...") + if boulder_server: + print("Found existing boulder server:", boulder_server) + boulder_preexists = True + else: + print("Can't find a boulder server, starting one...") + boulder_server = make_instance(ec2_client, + 'le-boulderserver', + BOULDER_AMI, + KEYNAME, + machine_type='t2.micro', + #machine_type='t2.medium', + security_group_id=security_group_id, + subnet_id=subnet_id) + + try: + if not cl_args.boulderonly: + instances = create_client_instances(ec2_client, targetlist, security_group_id, subnet_id) + + # Configure and launch boulder server + #------------------------------------------------------------------------------- + print("Waiting on Boulder Server") + boulder_server = block_until_instance_ready(boulder_server) + print(" server %s"%boulder_server) -# Set up EC2 instances -#------------------------------------------------------------------------------- -configdata = yaml.load(open(cl_args.config_file, 'r')) -targetlist = configdata['targets'] -print('Testing against these images: [%d total]'%len(targetlist)) -for target in targetlist: - print(target['ami'], target['name']) + # env.host_string defines the ssh user and host for connection + env.host_string = "ubuntu@%s"%boulder_server.public_ip_address + print("Boulder Server at (SSH):", env.host_string) + if not boulder_preexists: + print("Configuring and Launching Boulder") + config_and_launch_boulder(boulder_server) + # blocking often unnecessary, but cheap EC2 VMs can get very slow + block_until_http_ready('http://%s:4000'%boulder_server.public_ip_address, + wait_time=10, timeout=500) -print("Connecting to EC2 using\n profile %s\n keyname %s\n keyfile %s"%(PROFILE, KEYNAME, KEYFILE)) -AWS_SESSION = boto3.session.Session(profile_name=PROFILE) -EC2 = AWS_SESSION.resource('ec2') + boulder_url = "http://%s:4000/directory"%boulder_server.private_ip_address + print("Boulder Server at (public ip): http://%s:4000/directory"%boulder_server.public_ip_address) + print("Boulder Server at (EC2 private ip): %s"%boulder_url) -print("Determining Subnet") -for subnet in EC2.subnets.all(): - if should_use_subnet(subnet): - subnet_id = subnet.id - vpc_id = subnet.vpc.id - break -else: - print("No usable subnet exists!") - print("Please create a VPC with a subnet named {0}".format(SUBNET_NAME)) - print("that maps public IPv4 addresses to instances launched in the subnet.") - sys.exit(1) + if cl_args.boulderonly: + sys.exit(0) -print("Making Security Group") -vpc = EC2.Vpc(vpc_id) -sg_exists = False -for sg in vpc.security_groups.all(): - if sg.group_name == SECURITY_GROUP_NAME: - security_group_id = sg.id - sg_exists = True - print(" %s already exists"%SECURITY_GROUP_NAME) -if not sg_exists: - security_group_id = make_security_group(vpc).id - time.sleep(30) + # Install and launch client scripts in parallel + #------------------------------------------------------------------------------- + print("Uploading and running test script in parallel: %s"%cl_args.test_script) + print("Output routed to log files in %s"%LOGDIR) + # (Advice: always use Manager.Queue, never regular multiprocessing.Queue + # the latter has implementation flaws that deadlock it in some circumstances) + manager = Manager() + outqueue = manager.Queue() + inqueue = manager.Queue() -boulder_preexists = False -boulder_servers = EC2.instances.filter(Filters=[ - {'Name': 'tag:Name', 'Values': ['le-boulderserver']}, - {'Name': 'instance-state-name', 'Values': ['running']}]) - -boulder_server = next(iter(boulder_servers), None) - -print("Requesting Instances...") -if boulder_server: - print("Found existing boulder server:", boulder_server) - boulder_preexists = True -else: - print("Can't find a boulder server, starting one...") - boulder_server = make_instance('le-boulderserver', - BOULDER_AMI, - KEYNAME, - machine_type='t2.micro', - #machine_type='t2.medium', - security_group_id=security_group_id, - subnet_id=subnet_id) - -try: - if not cl_args.boulderonly: - instances = create_client_instances(targetlist, security_group_id, subnet_id) - - # Configure and launch boulder server - #------------------------------------------------------------------------------- - print("Waiting on Boulder Server") - boulder_server = block_until_instance_ready(boulder_server) - print(" server %s"%boulder_server) + # launch as many processes as clients to test + num_processes = len(targetlist) + jobs = [] #keep a reference to current procs - # env.host_string defines the ssh user and host for connection - env.host_string = "ubuntu@%s"%boulder_server.public_ip_address - print("Boulder Server at (SSH):", env.host_string) - if not boulder_preexists: - print("Configuring and Launching Boulder") - config_and_launch_boulder(boulder_server) - # blocking often unnecessary, but cheap EC2 VMs can get very slow - block_until_http_ready('http://%s:4000'%boulder_server.public_ip_address, - wait_time=10, timeout=500) + # initiate process execution + for i in range(num_processes): + p = mp.Process(target=test_client_process, args=(inqueue, outqueue, boulder_url)) + jobs.append(p) + p.daemon = True # kills subprocesses if parent is killed + p.start() - boulder_url = "http://%s:4000/directory"%boulder_server.private_ip_address - print("Boulder Server at (public ip): http://%s:4000/directory"%boulder_server.public_ip_address) - print("Boulder Server at (EC2 private ip): %s"%boulder_url) + # fill up work queue + for ii, target in enumerate(targetlist): + inqueue.put((ii, instances[ii].id, target)) - if cl_args.boulderonly: - sys.exit(0) + # add SENTINELs to end client processes + for i in range(num_processes): + inqueue.put(SENTINEL) + # wait on termination of client processes + for p in jobs: + p.join() + # add SENTINEL to output queue + outqueue.put(SENTINEL) - # Install and launch client scripts in parallel - #------------------------------------------------------------------------------- - print("Uploading and running test script in parallel: %s"%cl_args.test_script) - print("Output routed to log files in %s"%LOGDIR) - # (Advice: always use Manager.Queue, never regular multiprocessing.Queue - # the latter has implementation flaws that deadlock it in some circumstances) - manager = Manager() - outqueue = manager.Queue() - inqueue = manager.Queue() - SENTINEL = None #queue kill signal + # clean up + execute(local_repo_clean) - # launch as many processes as clients to test - num_processes = len(targetlist) - jobs = [] #keep a reference to current procs + # print and save summary results + results_file = open(LOGDIR+'/results', 'w') + outputs = [outq for outq in iter(outqueue.get, SENTINEL)] + outputs.sort(key=lambda x: x[0]) + for outq in outputs: + ii, target, status = outq + print('%d %s %s'%(ii, target['name'], status)) + results_file.write('%d %s %s\n'%(ii, target['name'], status)) + if len(outputs) != num_processes: + failure_message = 'FAILURE: Some target machines failed to run and were not tested. ' +\ + 'Tests should be rerun.' + print(failure_message) + results_file.write(failure_message + '\n') + results_file.close() + + finally: + cleanup(cl_args, instances, targetlist) + + # kill any connections + fabric.network.disconnect_all() - # initiate process execution - for i in range(num_processes): - p = mp.Process(target=test_client_process, args=(inqueue, outqueue)) - jobs.append(p) - p.daemon = True # kills subprocesses if parent is killed - p.start() - - # fill up work queue - for ii, target in enumerate(targetlist): - inqueue.put((ii, target)) - - # add SENTINELs to end client processes - for i in range(num_processes): - inqueue.put(SENTINEL) - # wait on termination of client processes - for p in jobs: - p.join() - # add SENTINEL to output queue - outqueue.put(SENTINEL) - - # clean up - execute(local_repo_clean) - - # print and save summary results - results_file = open(LOGDIR+'/results', 'w') - outputs = [outq for outq in iter(outqueue.get, SENTINEL)] - outputs.sort(key=lambda x: x[0]) - for outq in outputs: - ii, target, status = outq - print('%d %s %s'%(ii, target['name'], status)) - results_file.write('%d %s %s\n'%(ii, target['name'], status)) - if len(outputs) != num_processes: - failure_message = 'FAILURE: Some target machines failed to run and were not tested. ' +\ - 'Tests should be rerun.' - print(failure_message) - results_file.write(failure_message + '\n') - results_file.close() - -finally: - cleanup(cl_args, instances, targetlist) - - # kill any connections - fabric.network.disconnect_all() +if __name__ == '__main__': + main()