Computer Vision for Medical Imaging: Part 4. SageMaker Pipelines

This notebook is the final part of a 4-part series of techniques and services offer by SageMaker to build a model which predicts if an image of cells contains cancer. This notebook describes how to automate the ML workflow using SageMaker Pipelines.

Dataset

The dataset for this demo comes from the Camelyon16 Challenge made available under the CC0 licencse. The raw data provided by the challenge has been processed into 96x96 pixel tiles by Bas Veeling and also made available under the CC0 license. For detailed information on each dataset please see the papers below: * Ehteshami Bejnordi et al. Diagnostic Assessment of Deep Learning Algorithms for Detection of Lymph Node Metastases in Women With Breast Cancer. JAMA: The Journal of the American Medical Association, 318(22), 2199–2210. doi:jama.2017.14585 * B. S. Veeling, J. Linmans, J. Winkens, T. Cohen, M. Welling. “Rotation Equivariant CNNs for Digital Pathology”. arXiv:1806.03962

The tiled dataset from Bas Veeling is over 6GB of data. In order to easily run this demo, the dataset has been pruned to the first 14,000 images of the tiled dataset and comes included in the repo with this notebook for convenience.

Update Sagemaker SDK and Boto3

NOTE You may get an error from pip’s dependency resolver; you can ignore this error.

[ ]:
%store -r
%store

Import Libraries

[ ]:
import boto3
import sagemaker
import numpy as np
import matplotlib.pyplot as plt
import cv2

from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CreateModelStep
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.parameters import ParameterInteger, ParameterFloat, ParameterString

Configure Boto3 Clients and Sessions

[ ]:
region = "us-west-2"  # Change region as needed
boto3.setup_default_session(region_name=region)
boto_session = boto3.Session(region_name=region)

s3_client = boto3.client("s3", region_name=region)

sagemaker_boto_client = boto_session.client("sagemaker")
sagemaker_session = sagemaker.session.Session(
    boto_session=boto_session, sagemaker_client=sagemaker_boto_client
)
sagemaker_role = sagemaker.get_execution_role()

bucket = sagemaker.Session().default_bucket()

Configure the Estimator

[ ]:
training_image = sagemaker.image_uris.retrieve("image-classification", region)

hyperparameters = {
    "num_layers": 18,
    "use_pretrained_model": 1,
    "augmentation_type": "crop_color_transform",
    "image_shape": "3,96,96",
    "num_classes": 2,
    "num_training_samples": num_training_samples,
    "mini_batch_size": 64,
    "epochs": 5,
    "learning_rate": 0.01,
    "precision_dtype": "float32",
}

estimator_config = {
    "hyperparameters": hyperparameters,
    "image_uri": training_image,
    "role": sagemaker.get_execution_role(),
    "instance_count": 1,
    "instance_type": "ml.p3.2xlarge",
    "volume_size": 100,
    "max_run": 360000,
    "output_path": f"s3://{bucket}/{prefix}/training_jobs",
}

image_classifier = sagemaker.estimator.Estimator(**estimator_config)

Pipeline

Step 1: Create RecordIO Splits

[ ]:
base_uri = f"s3://{bucket}/{prefix}/data"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path="data/camelyon16_tiles.h5", desired_s3_uri=base_uri
)

input_data = ParameterString(name="InputData", default_value=input_data_uri)
[ ]:
s3_client.upload_file(Filename="split_data.py", Bucket=bucket, Key=f"{prefix}/code/split_data.py")
split_data_script_uri = f"s3://{bucket}/{prefix}/code/split_data.py"
split_data_instance_type = "ml.t3.large"

sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type=split_data_instance_type,
    instance_count=1,
    base_job_name="image-classication-split-data",
    role=sagemaker_role,
)

split_data_step = ProcessingStep(
    name="SplitData",
    processor=sklearn_processor,
    inputs=[
        sagemaker.processing.ProcessingInput(
            source=input_data, destination="/opt/ml/processing/input"
        ),
    ],
    outputs=[
        sagemaker.processing.ProcessingOutput(
            output_name="train_data", source="/opt/ml/processing/output/data/train"
        ),
        sagemaker.processing.ProcessingOutput(
            output_name="val_data", source="/opt/ml/processing/output/data/val"
        ),
        sagemaker.processing.ProcessingOutput(
            output_name="test_data", source="/opt/ml/processing/output/data/test"
        ),
    ],
    code=split_data_script_uri,
)

Step 2: Train Model

[ ]:
train_step_inputs = {
    "train": sagemaker.inputs.TrainingInput(
        s3_data=split_data_step.properties.ProcessingOutputConfig.Outputs[
            "train_data"
        ].S3Output.S3Uri,
        content_type="application/x-recordio",
        s3_data_type="S3Prefix",
        input_mode="Pipe",
    ),
    "validation": sagemaker.inputs.TrainingInput(
        s3_data=split_data_step.properties.ProcessingOutputConfig.Outputs[
            "val_data"
        ].S3Output.S3Uri,
        content_type="application/x-recordio",
        s3_data_type="S3Prefix",
        input_mode="Pipe",
    ),
}

train_step = TrainingStep(name="TrainModel", estimator=image_classifier, inputs=train_step_inputs)

Step 3: Register Model

[ ]:
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

register_step = RegisterModel(
    name="RegisterModel",
    estimator=image_classifier,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["image/jpeg"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=mpg_name,
    approval_status=model_approval_status,
)

Step 4: Create Model

[ ]:
model = sagemaker.model.Model(
    name=f"{mpg_name}-pipline",
    image_uri=train_step.properties.AlgorithmSpecification.TrainingImage,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=sagemaker_role,
)

inputs = sagemaker.inputs.CreateModelInput(instance_type="ml.m4.xlarge")

create_model_step = CreateModelStep(name="ModelPreDeployment", model=model, inputs=inputs)

Step 5: Deploy Model

[ ]:
s3_client.upload_file(
    Filename="deploy_model.py", Bucket=bucket, Key=f"{prefix}/code/deploy_model.py"
)
deploy_model_script_uri = f"s3://{bucket}/{prefix}/code/deploy_model.py"

deploy_model_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=sagemaker_role,
    instance_type="ml.t3.medium",
    instance_count=1,
    base_job_name=f"{prefix}-deploy-model",
    sagemaker_session=sagemaker_session,
)

deploy_step = ProcessingStep(
    name="DeployModel",
    processor=deploy_model_processor,
    job_arguments=[
        "--model-name",
        create_model_step.properties.ModelName,
        "--region",
        region,
        "--endpoint-instance-type",
        deploy_instance_type,
        "--endpoint-name",
        "cv-model-pipeline",
    ],
    code=deploy_model_script_uri,
)

Create Pipeline

[ ]:
pipeline_name = f"{prefix}-pipeline"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[input_data, model_approval_status],
    steps=[split_data_step, train_step, register_step, create_model_step, deploy_step],
)

pipeline.upsert(role_arn=sagemaker_role)
[ ]:
parameters = {"ModelApprovalStatus": "Approved"}

start_response = pipeline.start(parameters=parameters)
start_response.wait(max_attempts=100)
start_response.describe()

Lineage

Review the lineage of the artifacts generated by the pipeline.

[ ]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer
from pprint import pprint


viz = LineageTableVisualizer(sagemaker_session)
for execution_step in reversed(start_response.list_steps()):
    pprint(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

Clean up resources

[ ]:
best_model.sagemaker_session.delete_endpoint(mpg_name)
[ ]: