3

I wrote a lambda script to manage the life cycle of Amazon Machine Images using python and boto3. The script is working fine, but when I realized that I had to write unit tests for it, my nightmare began. I am not a develop and I am used to write scripts as SysAdmin.

I have already created unit tests for functions with return state like the following below and I works fine.

def get_interface_wrapper(region, service, interface_type):
    interface_types = ['client', 'resource']
    interface = None

    if (type(region) == str) and (type(service) == str) and (type(interface_type) == str) and (interface_type in interface_types):
        interface = ("boto3." + interface_type +
                     "(" + "service_name=service," + "region_name=region)")

    return interface

def get_interface(region, service, interface_type):
    return eval(get_interface_wrapper(region, service, interface_type))

#Unit tests
def test_get_interface_client(self):

    service = 'ec2'
    interface_expression = 'boto3.client(service_name=service,region_name=region)'
    client_interface = get_interface_wrapper(
        self.region, service, 'client')
    self.assertEqual(client_interface, interface_expression)


def test_get_interface_resource(self):

    service = 'ec2'
    interface_expression = 'boto3.resource(service_name=service,region_name=region)'
    resource_interface = get_interface_wrapper(
        self.region, service, 'resource')
    self.assertEqual(resource_interface, interface_expression)

However, for the following functions that have no return statement and rely on AWS endpoint, I am struggling to wrap my head around it. How I can a mock the endpoint or how I can change my code to create a unit test that does not rely on AWS endpoints.

def update_states(actions, ec2_client, logs_client, log_group, log_stream, dryrun_enabled=True):
    for action in actions:

        action.update({'phase': 'planning', 'PlanningTime': datetime.utcnow(
        ).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'})
        put_log_events(logs_client, log_group, log_stream, [action])

        # The tag packer_ami_state_tagging_date is not set
        if (action['is_timestamp_present'] == True):

            if (action['action'] == 'update'):
                # The tag packer_ami_state_tagging_date is set, so update the state and tagging date
                try:

                    ec2_client.Image(action['ImageId']).create_tags(DryRun=dryrun_enabled, Tags=[{'Key': 'packer_ami_state', 'Value': action['new_packer_ami_state']},
                                                                                                 {'Key': 'packer_ami_state_tagging_date', 'Value': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'}, ])

                    operation_result = [
                        {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime':  datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'AMI state and tagging date was updated'}, ]

                except Exception as e:

                    operation_result = [
                        {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'AMI state and tagging date was not updated', 'Error': e.args[0], }]

                finally:
                    put_log_events(logs_client, log_group,
                                   log_stream, operation_result)

            if (action['action'] == 'delete'):
                image = ec2_client.Image(action['ImageId'])
                snapshots = []
                for blockDevMapping in image.block_device_mappings:
                    if 'Ebs' in blockDevMapping:
                        snapshots.append(blockDevMapping['Ebs']['SnapshotId'])

                try:
                    image.deregister(DryRun=dryrun_enabled)
                    operation_result = [
                        {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime':  datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'AMI was deregistered'}, ]

                except Exception as e:
                    operation_result = [
                        {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'AMI was not deregistered', 'Error': e.args[0], }]

                finally:
                    put_log_events(logs_client, log_group,
                                   log_stream, operation_result)

                counter = 1
                for snapshotID in snapshots:
                    snapshot = ec2_client.Snapshot(snapshotID)

                    try:
                        snapshot.delete(DryRun=dryrun_enabled)
                        operation_result = [
                            {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime':  datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'SnapShot deleted', 'SnapShotID': snapshotID}, ]

                    except Exception as e:
                        operation_result = [
                            {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'SnapShot not deleted', 'Error': e.args[0], 'SnapShotID': snapshotID}, ]

                    finally:
                        put_log_events(logs_client, log_group,
                                       log_stream, operation_result)

                    counter += 1

            if (action['action'] == 'none'):
                action.update(
                    {'OperationDate': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'OperationResult': 'No action'})

                operation_result = [
                    {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime':  datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'No action'}, ]

                put_log_events(logs_client, log_group,
                               log_stream, operation_result)

        else:
            try:
                ec2_client.Image(action['ImageId']).create_tags(DryRun=dryrun_enabled, Tags=[
                    {'Key': 'packer_ami_state_tagging_date', 'Value': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'}, ])

                operation_result = [
                    {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime':  datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'Tag created'}, ]

            except Exception as e:
                operation_result = [
                    {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'Tag not created', 'Error': e.args[0], }]

            finally:
                put_log_events(logs_client, log_group,
                               log_stream, operation_result)


def put_log_events(client, log_group_name, log_stream_name, log_events):
    log_stream = client.describe_log_streams(
        logGroupName=log_group_name,
        logStreamNamePrefix=log_stream_name
    )

    if (bool(log_stream['logStreams'])) and ('uploadSequenceToken' in log_stream['logStreams'][0]):
        response = {
            'nextSequenceToken': log_stream['logStreams'][0]['uploadSequenceToken']}
    else:
        response = {}

    for log_event in log_events:
        if bool(response):
            response = client.put_log_events(
                logGroupName=log_group_name,
                logStreamName=log_stream_name,
                logEvents=[
                    {
                        'timestamp': int(round(time.time() * 1000)),
                        'message': json.dumps(log_event)
                    },
                ],
                sequenceToken=response['nextSequenceToken']
            )
        else:
            response = client.put_log_events(
                logGroupName=log_group_name,
                logStreamName=log_stream_name,
                logEvents=[
                    {
                        'timestamp': int(round(time.time() * 1000)),
                        'message': json.dumps(log_event)
                    },
                ],
            )
1
  • 6
    You should split update_states in small functions that do only one things, from there unitest will be much easier. So focus on refactoring your code first. Commented Oct 31, 2019 at 23:26

1 Answer 1

1

I recommend you use patching in the built-in unittest.mock library. I use that to mock out all boto3 calls so I never hit a real AWS service. There are many options, but here is a simple example that mocks out the client.

Suppose you have code in a module named "my_code" that imports boto3 and makes calls to a "ssm" boto3 client to the get_parameters_by_path function. You might mock that out with code such as this:

from unittest.mock import patch, MagicMock

...

@patch('my_app.my_code.boto3')
def test_secrets_load_ssm(self, mock_boto):
    mock_client = MagicMock()
    mock_boto.client.return_value = mock_client
    mock_client.get_parameters_by_path.return_value = helper_function()

    my_param = my_code.my_function_being_tested_that_fetches_a_parameter('/TEST_APP/CI/secure_string_test')

    self.assertEqual(my_param, 'secure string value')


def helper_function():
    return {'Parameters': [{'Name': '/TEST_APP/CI/secure_string_test',
                            'Type': 'SecureString',
                            'Value': 'secure string value',
                            'Version': 1,
                            'LastModifiedDate': datetime.datetime(2019, 8, 8, 14, 44, 26, 878000, tzinfo=datetime.timezone.utc),
                            'ARN': 'arn:aws:ssm:us-east-1:999478573200:parameter/TEST_APP/CI/secure_string_test'}], ResponseMetadata': {'RequestId': 'b9f016a4-485d-80d2-a504-015d081d8603',
                                 'HTTPStatusCode': 200,
                                 'HTTPHeaders': {'x-amzn-requestid': 'b9f016a4-475d-40d2-a504-015d981d8603',
                                                 'content-type': 'application/x-amz-json-1.1',
                                                 'content-length': '666',
                                                 'date': 'Fri, 30 Aug 2019 16:57:17 GMT'},
                                 'RetryAttempts': 0}
            }

I put the mocked-up return value in a separate helper function since that is not the point of this example and be whatever JSON you need boto3 to mock-up as a return. If you aren't familiar with unittest mock and patching you'll have to ramp a bit on using it, but having done so myself I can attest that it will solve these kinds of boto3 unit test problems much more elegantly.

The @patch annotation lets you swap out the real boto3 library with mock-up calls you create. The annotation declares which imported function you are patching, and that requires a corresponding variable in the function signature (mock_boto in this example). The next couple line that set up an object to be returned when the code in the function I am testing calls boto3.client(), and then the following line sets up what should be returned when the code calls the get_parameters_by_path function of the client object. Patching has functions like assert_called_once on your mocks to validate that the function was called as expected, so even if the function doesn't return anything you can mock it out.

Sign up to request clarification or add additional context in comments.

1 Comment

Shawn, thank you for your replay. I will test your idea a let you know.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.