Pipeline Dreams: Automating ML Training on AWS

In the world of machine learning, automated training pipelines streamline the journey from data to insight. They automate various parts of the machine learning life cycle such as data ingestion, preprocessing, model training, evaluation and deployment. Amazon Web Services (“AWS”) provides various tools to develop an automated training pipeline. In this article, we will walk through setting up a basic automated training pipeline using using the classic iris dataset.

Setting the Stage: Requirements and AWS Toolkit


In this section, we will cover some high level requirements as well as a brief overview of the AWS tools we will use.

Requirements

If you choose to follow along by building your own training pipeline, you will need the following.

  • An active AWS account (you can sign up here) with Administrator Access
  • Basic knowledge of AWS CLI (We will explore alternatives to the AWS CLI in future posts)

Setting up your AWS account and connecting to AWS via the CLI is beyond the scope of this post, however- feel free to reach out directly if you need help.

Toolkit

Setting up the automated training pipeline will require the use of the following AWS products.

  • S3: Scalable object storage service designed to store and retrieve any amount of data from anywhere on the web
  • Lambda: Serverless compute service that automatically runs your code in response to events, such as changes to data in an Amazon S3 bucket
  • Docker: Docker is a platform that packages, distributes, and manages applications inside lightweight, portable containers
  • Sagemaker: Fully managed service that provides developers and data scientists with the ability to build, train, and deploy machine learning models quickly and easily
  • Step Functions: Serverless workflow service that lets you coordinate distributed applications and microservices using visual workflows, enabling you to build, run, and visualize complex processes at scale
 

Implementing the Automated Training Pipeline


 

 

Assuming you made it past the requirements, we can switch our focus to building our automated training pipeline. For this simple example, we will focus on the following steps.

  1. Create AWS S3 bucket to store data and artifacts related to our training pipeline.
  2. Create AWS Lambda function data ingestion, preprocessing, and training.
  3. Create an AWS Step Functions state machine to orchestrate the execution of your pipeline stages.

We take a deep dive into each of these steps below.

S3 Bucket

The first thing we will do is create a new S3 bucket to store data and artifacts using the AWS CLI.

aws s3api create-bucket
  --bucket automated-training-pipeline
  --region us-east-1

If this command runs successfully, it should output something like this in your terminal.

{
    "Location": "/automated-training-pipeline"
}

Taking a look at our management console, we can see the S3 bucket was successfully created.

 

Lambda Functions

 

For our automated training pipeline, we will rely heavily on the use of lambda functions to execute and trigger certain parts of our process. Specifically, we will use lambdas for:

  • Data ingestion
  • Data preprocessing
  • Model training

Data Ingestion

Lambda functions require lambda handlers. The Lambda handler is a user-defined function in a Lambda deployment package that the AWS Lambda service can invoke when the service executes the Lambda function. The handler function receives and processes the event data from the invoker.

With that in mind, let’s define our lambda_handler for data ingestion. The lambda_handler function serves as the entry point for AWS Lambda to execute the code, which in this case retrieves a CSV file from a specified URL and uploads it to an Amazon S3 bucket. This file has been stored in ./src/data.

import boto3
import requests

def lambda_handler(event, context):
    s3 = boto3.client('s3')
    data_url = 'https://example.com/dataset.csv'
    response = requests.get(data_url)
   
    # Assuming the data is small enough to fit into memory
    s3.put_object(
        Bucket='your-s3-bucket',
        Key='data/dataset.csv',
        Body=response.content
    )

We will package and deploy your AWS Lambda function as a container image using docker. To do so, we first need to create an ECR repository to house our image.

aws ecr create-repository --repository-name fetch-iris-data --region us-east-1

Now that we have our ECR repository stood up, we create our Dockerfile in the root of our project directory.

# Use a base image with Python 3.11
FROM public.ecr.aws/lambda/python:3.11

# Install dependencies (if any)
RUN pip install requests boto3

# Copy your function code from the src/data directory to the Lambda task root directory
COPY src/data/fetch_iris_data.py ${LAMBDA_TASK_ROOT}

# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)
CMD ["fetch_iris_data.lambda_handler"]

We use this Dockerfile to build our image and push it to the ECR registry.

docker build -t fetch-iris-data .

We need to authenticate Docker to the ECR registry.

aws ecr get-login-password --region region | docker login --username AWS --password-stdin <your-account-id>.dkr.ecr.<region>.amazonaws.com

Next, we tag the image to match the repository.

docker tag fetch-iris-data:latest <your-account-id>.dkr.ecr.<region>.amazonaws.com/fetch-iris-data:latest

Now, we can push our image to the ECR registry.

docker push <your-account-id>.dkr.ecr.<region>.amazonaws.com/fetch-iris-data:latest

Finally, we will create the lambda function and attach the container.

aws lambda create-function \
    --function-name <function-name> \
    --package-type Image \
    --code ImageUri=<your-account-id>.dkr.ecr.<region>.amazonaws.com/<repository-name>:<tag> \
    --role arn:aws:iam::<your-account-id>:role/<role-name> \
    --region <region>

You will now be able to see the lambda function in your AWS management console. I can invoke the function and confirm the file was downloaded to the S3 bucket correctly.

aws lambda invoke --function-name fetch-iris-data-function --payload '{}'

aws s3 ls s3://automated-training-pipeline/data/

#2023-10-24 13:52:41      4551 iris.data

Preprocessing

Now, we have the data we need for our modeling task. For most machine learning tasks, we will want to process that data in some way. In the file below, we define a script that will process our Iris data.

import argparse
import os
import pandas as pd
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split


def preprocess(input_data_path, output_data_path):
    column_names = [
        "sepal_length",
        "sepal_width",
        "petal_length",
        "petal_width",
        "species",
    ]
    df = pd.read_csv(input_data_path, header=None, names=column_names)

    df = df.dropna(subset=["species"])

    encoder = LabelEncoder()
    df["species"] = encoder.fit_transform(df["species"])

    train, test = train_test_split(df, test_size=0.2, random_state=42)

    mean_train = train.mean()

    train.fillna(mean_train, inplace=True)
    test.fillna(mean_train, inplace=True)

    scaler = StandardScaler()
    scaler.fit(train[["sepal_length", "sepal_width", "petal_length", "petal_width"]])

    train[
        ["sepal_length", "sepal_width", "petal_length", "petal_width"]
    ] = scaler.transform(
        train[["sepal_length", "sepal_width", "petal_length", "petal_width"]]
    )
    test[
        ["sepal_length", "sepal_width", "petal_length", "petal_width"]
    ] = scaler.transform(
        test[["sepal_length", "sepal_width", "petal_length", "petal_width"]]
    )

    train.to_csv(
        os.path.join(output_data_path, "train_data.csv"), header=False, index=False
    )
    test.to_csv(
        os.path.join(output_data_path, "test_data.csv"), header=False, index=False
    )


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--input-data", type=str)
    parser.add_argument("--output-data", type=str)
    args = parser.parse_args()

    preprocess(args.input_data, args.output_data)

We upload our preprocessing script to S3. Run this command in the same directory as preprocess.py

aws s3 cp preprocess.py s3://automated-training-pipeline/modeling/preprocess.py

Similar to how we built an image for the data ingestion step, we will build an image for the preprocessing step. Like before, let’s start by creating our repository.

aws ecr create-repository --repository-name preprocess-iris-data --region <region>

Now that we have our ECR repository stood up, we create our Dockerfile in the root of our project directory.

# Use a standard Python base image
FROM python:3.11-slim-buster

# Install necessary libraries: requests, boto3, scikit-learn, pandas, numpy, etc.
RUN pip install requests boto3 scikit-learn pandas numpy

# No need to copy any Python script here since it will be provided from S3 at runtime

# Set working directory
WORKDIR /app

# Set a default CMD or ENTRYPOINT in case you want to run the container for testing, but this isn't strictly necessary
# since the Processing job will override this with ContainerEntrypoint from your Lambda function
CMD ["python", "-c", "print('Container started successfully')"]

We build the image.

docker build -t preprocess-iris-data .

Let’s authenticate Docker to the ECR registry. You might not have to do this if your access hasn’t expired.

aws ecr get-login-password --region region | docker login --username AWS --password-stdin <your-account-id>.dkr.ecr.<region>.amazonaws.com

Again, we tag the image to match the repository.

docker tag preprocess-iris-data:latest <your-account-id>.dkr.ecr.<region>.amazonaws.com/preprocess-iris-data:latest

Now, we can push our image to the ECR registry.

docker push <your-account-id>.dkr.ecr.<region>.amazonaws.com/preprocess-iris-data:latest

Like before, we will create a lambda function that will kick off a SageMaker processing job. We define our lambda handler below.

import boto3
import datetime


def lambda_handler(event, context):
    sagemaker_client = boto3.client("sagemaker")

    # Parameters for the processing job
    job_name = "iris-preprocessing-job-" + datetime.datetime.now().strftime(
        "%Y%m%d%H%M%S"
    )
    role_arn = "arn:aws:iam::<your-account-id>:role/service-role/<role>"
    image_uri = (
        "<your-account-id>.dkr.<region>.amazonaws.com/preprocess-iris-data:latest"
    )
    input_s3_uri = "s3://automated-training-pipeline/data/iris.data"
    preprocess_s3_uri = "s3://automated-training-pipeline/modeling/preprocess.py"
    output_s3_uri = "s3://automated-training-pipeline/data/"

    response = sagemaker_client.create_processing_job(
        ProcessingJobName=job_name,
        RoleArn=role_arn,
        ProcessingInputs=[
            {
                "InputName": "input-1",
                "S3Input": {
                    "S3Uri": input_s3_uri,
                    "LocalPath": "/opt/ml/processing/input",
                    "S3DataType": "S3Prefix",
                    "S3InputMode": "File",
                    "S3DataDistributionType": "FullyReplicated",
                    "S3CompressionType": "None",
                },
            },
            {
                "InputName": "code",
                "S3Input": {
                    "S3Uri": preprocess_s3_uri,
                    "LocalPath": "/opt/ml/processing/code",
                    "S3DataType": "S3Prefix",
                    "S3InputMode": "File",
                    "S3DataDistributionType": "FullyReplicated",
                    "S3CompressionType": "None",
                },
            },
        ],
        ProcessingOutputConfig={
            "Outputs": [
                {
                    "OutputName": "output-1",
                    "S3Output": {
                        "S3Uri": output_s3_uri,
                        "LocalPath": "/opt/ml/processing/output",
                        "S3UploadMode": "EndOfJob",
                    },
                }
            ]
        },
        ProcessingResources={
            "ClusterConfig": {
                "InstanceCount": 1,
                "InstanceType": "ml.t3.medium",
                "VolumeSizeInGB": 5,
            }
        },
        AppSpecification={
            "ImageUri": image_uri,
            "ContainerArguments": [
                "--input-data",
                "/opt/ml/processing/input/iris.data",
                "--output-data",
                "/opt/ml/processing/output",
            ],
            "ContainerEntrypoint": ["python3", "/opt/ml/processing/code/preprocess.py"],
        },
    )

    return {"statusCode": 200, "body": response}

We zip up our Lambda handler and store in S3.

zip lambda_function_preprocess.zip lambda_handler_preprocess.py

aws s3 cp lambda_function_preprocess.zip s3://automated-training-pipeline/modeling/lambda_function_preprocess.zip

Finally, we create our lambda function.

aws lambda create-function \
    --function-name iris-preprocessing-trigger \
    --runtime python3.11 \
    --role arn:aws:iam::<your-account-id>:role/<role>\
    --handler lambda_handler_preprocess.lambda_handler \
    --code S3Bucket=automated-training-pipeline,S3Key=modeling/lambda_function_preprocess.zip \
    --memory-size 256 \
    --timeout 900 

Since we haven’t strung the Lambda functions together yet, we can manually invoke the lambda and check the contents of S3 to ensure our training and testing datasets are there.

aws lambda invoke --function-name iris-preprocessing-trigger --payload '{}'

aws s3 ls s3://automated-training-pipeline/data/

#2023-10-24 14:50:46      4551 iris.data
#2023-10-24 17:08:42      2404 test_data.csv
#2023-10-24 17:08:42      9635 train_data.csv

Model training

Instead of creating our own image for train, we will use a built SageMaker image for logistic regression. You can create your own image if you want a customized training process. Consequently, we write our lambda handler right away.

import boto3
import datetime


def lambda_handler(event, context):
    sagemaker_client = boto3.client("sagemaker")

    job_name = "iris-training-job-" + datetime.datetime.now().strftime("%Y%m%d%H%M%S")
    role_arn = "arn:aws:iam::<your-account-id>:role/service-role/<role>"  # Update the ARN as required

    image_uri = "382416733822.dkr.ecr.us-east-1.amazonaws.com/linear-learner:1" # Public learner

    training_data_s3_uri = "s3://automated-training-pipeline/data/train_data.csv"
    output_s3_uri = "s3://automated-training-pipeline/models/"

    response = sagemaker_client.create_training_job(
        TrainingJobName=job_name,
        RoleArn=role_arn,
        AlgorithmSpecification={
            "TrainingImage": image_uri,
            "TrainingInputMode": "File",
        },
        InputDataConfig=[
            {
                "ChannelName": "train",
                "DataSource": {
                    "S3DataSource": {
                        "S3DataType": "S3Prefix",
                        "S3Uri": training_data_s3_uri,
                        "S3DataDistributionType": "FullyReplicated",
                    },
                },
                "ContentType": "text/csv",  # Specify that we're using CSV format
            },
        ],
        OutputDataConfig={
            "S3OutputPath": output_s3_uri,
        },
        ResourceConfig={
            "InstanceCount": 1,
            "InstanceType": "ml.m5.large",
            "VolumeSizeInGB": 5,
        },
        HyperParameters={
            "predictor_type": "multiclass_classifier",
            "num_classes": "3",
            "mini_batch_size": "30",
        },
        StoppingCondition={
            "MaxRuntimeInSeconds": 3600,
        },
    )

    model_artifact_uri = f"{output_s3_uri}/{job_name}/output/model.tar.gz"

    return {
        "statusCode": 200,
        "body": {"trainingJobName": job_name, "modelArtifactUri": model_artifact_uri},
    }

Before we can create our lambda function, we have to zip up our handler and send it to S3.

zip lambda_function_train.zip lambda_handler_train.py

aws s3 cp lambda_function_train.zip s3://automated-training-pipeline/modeling/lambda_function_train.zip

Now we can create our lambda function.

aws lambda create-function \                                                                         
    --function-name iris-training-trigger \
    --runtime python3.11 \
    --role arn:aws:iam::<your-account-id>:role/<role> \
    --handler lambda_handler_train.lambda_handler \
    --code S3Bucket=automated-training-pipeline,S3Key=modeling/lambda_function_train.zip \

Again, we can manually invoke our training lambda to ensure it works correctly.

aws lambda invoke --function-name iris-training-trigger --payload '{}' output.txt

aws s3 ls s3://automated-training-pipeline/models/

#PRE iris-training-job-20231025015943/

Step Functions

Now, we have a very basic pipeline consisting of data ingestion, preprocessing, and model training. However, each of these components exist on their own — how do we string them together to create an automated pipeline. Step functions!

To string our lambda functions together, the first thing we will do is define a state machine using Amazon States Language and save it in a .json file so we can push it to AWS using the CLI. For our purposes, we will define our state machine in the following way.

{
  "Comment": "A simple AWS Step Functions state machine that orchestrates 3 Lambda functions.",
  "StartAt": "DataIngestion",
  "States": {
    "DataIngestion": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:data-ingestion-lambda",
      "Next": "Preprocessing"
    },
    "Preprocessing": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:preprocessing-lambda",
      "Next": "Training"
    },
    "Training": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:training-lambda",
      "End": true
    }
  }
}

We can deploy our step function to AWS.

aws stepfunctions create-state-machine \
    --name “automated-training-pipeline“ \
    --definition file://pipeline.json \
    --role-arn "arn:aws:iam::<your-account-id>:role/<execution role>”

Once the state machine has been created, we can visualize it in the management console.

We can manually invoke the pipeline as follows.

aws stepfunctions start-execution \
    --state-machine-arn "arn:aws:states:us-east-1:<your-account-id>:stateMachine:<YourStateMachineName>"

Additional Considerations & Conclusion


The pipeline we built in this example is extremely basic — we don’t even consider model evaluation! In a real world setting, this approach can be expanded to cover additional parts of the model building lifecycle including model evaluation & deployment.

Additionally, we built our pipeline but we have to trigger it manually. In the real world, you can trigger the execution of a Step Functions state machine based on various events. Using triggers provides a more event-driven architecture. Here are some common ways to set up triggers for your state machine:

  1. Amazon S3 Events: If you want to run your state machine when a new file is uploaded to an S3 bucket, you can set up an event within the bucket to trigger a Lambda function, which in turn starts the execution of your state machine.
  2. AWS CodeCommit: If you’re using AWS CodeCommit as your repository, you can use AWS Lambda and Amazon CloudWatch Events to trigger the state machine whenever there’s a new commit.
  3. GitHub or Other Repositories: If you’re using a service like GitHub, you can use webhooks to notify an AWS service of a new commit or a pull request merge. Typically, you’d set up an API Gateway endpoint to receive the webhook, trigger a Lambda function from the API Gateway call, and then have the Lambda function start your state machine.
  4. Amazon DynamoDB Streams: If you want to trigger your state machine based on changes in a DynamoDB table, you can use DynamoDB Streams. Whenever there’s a change in the table, the stream can trigger a Lambda function, which then starts the state machine.
  5. AWS EventBridge (previously CloudWatch Events): AWS EventBridge allows you to create rules based on a wide range of AWS service events. You can target a Lambda function with these rules. Then, like in other scenarios, the Lambda function would start your state machine.

In conclusion, an automated training pipeline in the context of machine learning and data science offers multiple advantages. Here’s a concise list of pros for your conclusion:

  1. Consistency and Reproducibility: Automation ensures that the training process remains consistent across different runs. This aids in reproducing results and eliminates variations due to manual interventions.
  2. Efficiency: Automated pipelines can streamline processes, reducing the time required for training and retraining models. This can be especially beneficial when iterating over different model architectures or parameters.
  3. Scalability: As your data or model complexity grows, an automated pipeline can scale up resources and processes without the need for manual oversight.
  4. Resource Optimization: Automation can manage resources more effectively, potentially leading to cost savings. For instance, cloud resources can be automatically scaled down when not in use.

Feel free to reach out with any questions!